Skip to content

Commit

Permalink
feat: send JSON logs over UDP
Browse files Browse the repository at this point in the history
Confirmed to work with Elastic Filebeat.

Signed-off-by: Alexey Palazhchenko <alexey.palazhchenko@talos-systems.com>
  • Loading branch information
AlekSi committed Oct 18, 2021
1 parent 6d44587 commit 5237fdc
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 40 deletions.
15 changes: 13 additions & 2 deletions internal/app/machined/pkg/runtime/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ type LoggingManager interface {
// ServiceLog privides a log handler for a given service (that may not exist).
ServiceLog(service string) LogHandler

// SetSender sets the log sender for all derived log handlers.
SetSender(sender LogSender)
// SetSender sets the log sender for all derived log handlers
// and returns the previous one for closing.
//
// SetSender should be thread-safe.
SetSender(sender LogSender) LogSender
}

// LogOptions for LogHandler.Reader.
Expand Down Expand Up @@ -69,8 +72,16 @@ var ErrDontRetry = fmt.Errorf("don't retry")
// LogSender provides common interface for log senders.
type LogSender interface {
// Send tries to send the log event once, exiting on success, error, or context cancelation.
//
// Returned error is nil on success, non-nil otherwise.
// As a special case, Send can return (possibly wrapped) ErrDontRetry if the log event should not be resent
// (if it is invalid, if it was sent partially, etc).
//
// Send should be thread-safe.
Send(ctx context.Context, e *LogEvent) error

// Close stops the sender gracefully if possible, or forcefully on context cancelation.
//
// Close should be thread-safe.
Close(ctx context.Context) error
}
25 changes: 16 additions & 9 deletions internal/app/machined/pkg/runtime/logging/cicrular.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,20 @@ func (manager *CircularBufferLoggingManager) ServiceLog(id string) runtime.LogHa
}

// SetSender implements runtime.LoggingManager interface.
func (manager *CircularBufferLoggingManager) SetSender(sender runtime.LogSender) {
func (manager *CircularBufferLoggingManager) SetSender(sender runtime.LogSender) runtime.LogSender {
manager.senderRW.Lock()

manager.sender = sender
prev := manager.senderChanged
prevChanged := manager.senderChanged
manager.senderChanged = make(chan struct{})

prevSender := manager.sender
manager.sender = sender

manager.senderRW.Unlock()

close(prev)
close(prevChanged)

return prevSender
}

func (manager *CircularBufferLoggingManager) getSender() (runtime.LogSender, <-chan struct{}) {
Expand Down Expand Up @@ -199,10 +203,13 @@ func (handler *circularHandler) runSender() error {
continue
}

handler.resend(&runtime.LogEvent{
// TODO(aleksi): extract fields from msg there or in jsonSender
e := &runtime.LogEvent{
Msg: l,
Fields: handler.fields,
})
}

handler.resend(e)
}

return fmt.Errorf("scanner: %w", scanner.Err())
Expand All @@ -226,11 +233,11 @@ func (handler *circularHandler) resend(e *runtime.LogEvent) {
<-changed
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
sendCtx, sendCancel := context.WithTimeout(context.TODO(), 5*time.Second)

err := sender.Send(ctx, e)
err := sender.Send(sendCtx, e)

cancel()
sendCancel()

if err == nil {
return
Expand Down
4 changes: 3 additions & 1 deletion internal/app/machined/pkg/runtime/logging/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func (manager *FileLoggingManager) ServiceLog(id string) runtime.LogHandler {
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (manager *FileLoggingManager) SetSender(runtime.LogSender) {}
func (manager *FileLoggingManager) SetSender(runtime.LogSender) runtime.LogSender {
return nil
}

type fileLogHandler struct {
path string
Expand Down
4 changes: 3 additions & 1 deletion internal/app/machined/pkg/runtime/logging/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func (*NullLoggingManager) ServiceLog(id string) runtime.LogHandler {
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (*NullLoggingManager) SetSender(runtime.LogSender) {}
func (*NullLoggingManager) SetSender(runtime.LogSender) runtime.LogSender {
return nil
}

type nullLogHandler struct{}

Expand Down
101 changes: 93 additions & 8 deletions internal/app/machined/pkg/runtime/logging/sender_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,51 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"net"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

type jsonSender struct {
l *log.Logger
net string
addr string

sema chan struct{}
conn net.Conn
}

// NewJSON returns log sender that would eventually send logs in JSON.
// FIXME(aleksi): update comment.
func NewJSON() runtime.LogSender {
// NewJSON returns log sender that sends logs in JSON over UDP, one message per packet.
func NewJSON(addr string) runtime.LogSender {
sema := make(chan struct{}, 1)
sema <- struct{}{}

// It should be easy to add TCP support of that's requested.
net := "udp"

return &jsonSender{
l: log.New(os.Stdout, "JSON: ", 0),
net: net,
addr: addr,
sema: sema,
}
}

func (j jsonSender) tryLock(ctx context.Context) (unlock func()) {
select {
case <-j.sema:
unlock = func() { j.sema <- struct{}{} }
case <-ctx.Done():
unlock = nil
}

return
}

// Send implements LogSender interface.
func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
m := make(map[string]interface{}, len(e.Fields)+3)

// TODO(aleksi): extract fields from msg there or in circularHandler

m["msg"] = e.Msg
m["time"] = e.Time.Unix()
m["level"] = e.Level.String()
Expand All @@ -41,7 +66,67 @@ func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
return fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}

j.l.Printf("%s\n", b)
unlock := j.tryLock(ctx)
if unlock == nil {
return ctx.Err()
}

defer unlock()

// Connect (or "connect" for UDP) if no connection is established already.
if j.conn == nil {
conn, err := new(net.Dialer).DialContext(ctx, j.net, j.addr)
if err != nil {
return err
}

j.conn = conn
}

d, _ := ctx.Deadline()
j.conn.SetWriteDeadline(d) //nolint:errcheck

// Close connection on send error.
if n, err := j.conn.Write(b); err != nil {
j.conn.Close() //nolint:errcheck
j.conn = nil

if n > 0 {
err = fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}

return err
}

return nil
}

// Close implements LogSender interface.
func (j *jsonSender) Close(ctx context.Context) error {
unlock := j.tryLock(ctx)
if unlock == nil {
return ctx.Err()
}

defer unlock()

if j.conn == nil {
return nil
}

conn := j.conn
j.conn = nil

closed := make(chan error, 1)

go func() {
closed <- conn.Close()
}()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-closed:
return err
}
}
6 changes: 6 additions & 0 deletions internal/app/machined/pkg/runtime/logging/sender_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func NewNull() runtime.LogSender {
return nullSender{}
}

// Send implements LogSender interface (by doing nothing).
func (nullSender) Send(context.Context, *runtime.LogEvent) error {
return nil
}

// Close implements LogSender interface (by doing nothing).
func (nullSender) Close(context.Context) error {
return nil
}
52 changes: 33 additions & 19 deletions internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package v1alpha2

import (
"context"
"time"

"github.com/cosi-project/runtime/pkg/controller"
osruntime "github.com/cosi-project/runtime/pkg/controller/runtime"
Expand All @@ -24,7 +25,7 @@ import (
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/perf"
runtimecontrollers "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/secrets"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/time"
timecontrollers "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/time"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/v1alpha1"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
runtimelogging "github.com/talos-systems/talos/internal/app/machined/pkg/runtime/logging"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (ctrl *Controller) Run(ctx context.Context) error {
// V1Events
V1Alpha1Events: ctrl.v1alpha1Runtime.Events(),
},
&time.SyncController{
&timecontrollers.SyncController{
V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(),
},
&cluster.AffiliateMergeController{},
Expand Down Expand Up @@ -217,26 +218,39 @@ func (ctrl *Controller) watchMachineConfig(ctx context.Context) {
return
}

newLogLevel := zapcore.InfoLevel
if cfg.Debug() {
newLogLevel = zapcore.DebugLevel
}
ctrl.updateLoggingConfig(ctx, cfg, &loggingEnabled)
}
}

if newLogLevel != ctrl.consoleLogLevel.Level() {
ctrl.logger.Info("setting console log level", zap.Stringer("level", newLogLevel))
ctrl.consoleLogLevel.SetLevel(newLogLevel)
}
func (ctrl *Controller) updateLoggingConfig(ctx context.Context, cfg talosconfig.Provider, prevLoggingEnabled *bool) {
newLogLevel := zapcore.InfoLevel
if cfg.Debug() {
newLogLevel = zapcore.DebugLevel
}

if newLoggingEnabled := cfg.Machine().Features().LoggingEnabled(); newLoggingEnabled != loggingEnabled {
loggingEnabled = newLoggingEnabled
if newLogLevel != ctrl.consoleLogLevel.Level() {
ctrl.logger.Info("setting console log level", zap.Stringer("level", newLogLevel))
ctrl.consoleLogLevel.SetLevel(newLogLevel)
}

if newLoggingEnabled {
ctrl.logger.Info("enabling JSON logging")
ctrl.loggingManager.SetSender(runtimelogging.NewJSON())
} else {
ctrl.logger.Info("disabling JSON logging")
ctrl.loggingManager.SetSender(runtimelogging.NewNull())
}
if newLoggingEnabled := cfg.Machine().Features().LoggingEnabled(); newLoggingEnabled != *prevLoggingEnabled {
*prevLoggingEnabled = newLoggingEnabled

var prev runtime.LogSender

if newLoggingEnabled {
ctrl.logger.Info("enabling JSON logging")
prev = ctrl.loggingManager.SetSender(runtimelogging.NewJSON("127.0.0.1:12345"))
} else {
ctrl.logger.Info("disabling JSON logging")
prev = ctrl.loggingManager.SetSender(runtimelogging.NewNull())
}

if prev != nil {
closeCtx, closeCancel := context.WithTimeout(ctx, 3*time.Second)
err := prev.Close(closeCtx)
ctrl.logger.Info("log sender closed", zap.Error(err))
closeCancel()
}
}
}

0 comments on commit 5237fdc

Please sign in to comment.