Skip to content

Commit

Permalink
feat: multiple logging improvements
Browse files Browse the repository at this point in the history
Add JSON over TCP support.
Add support for multiple loggers.
Make logging configurable.

Signed-off-by: Alexey Palazhchenko <alexey.palazhchenko@talos-systems.com>
  • Loading branch information
AlekSi committed Oct 25, 2021
1 parent 1d1e1df commit 4c76865
Show file tree
Hide file tree
Showing 19 changed files with 515 additions and 157 deletions.
11 changes: 6 additions & 5 deletions hack/docgen/main.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions internal/app/machined/pkg/runtime/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type LoggingManager interface {
// ServiceLog privides a log handler for a given service (that may not exist).
ServiceLog(service string) LogHandler

// SetSender sets the log sender for all derived log handlers
// and returns the previous one for closing.
// SetSenders sets log senders for all derived log handlers
// and returns the previous ones for closing.
//
// SetSender should be thread-safe.
SetSender(sender LogSender) LogSender
// SetSenders should be thread-safe.
SetSenders(senders []LogSender) []LogSender
}

// LogOptions for LogHandler.Reader.
Expand Down
95 changes: 58 additions & 37 deletions internal/app/machined/pkg/runtime/logging/cicrular.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sync"
"time"

"github.com/talos-systems/go-debug"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/circular"
"github.com/talos-systems/talos/pkg/tail"
Expand All @@ -36,16 +38,16 @@ type CircularBufferLoggingManager struct {

buffers sync.Map

senderRW sync.RWMutex
sender runtime.LogSender
senderChanged chan struct{}
sendersRW sync.RWMutex
senders []runtime.LogSender
sendersChanged chan struct{}
}

// NewCircularBufferLoggingManager initializes new CircularBufferLoggingManager.
func NewCircularBufferLoggingManager(fallbackLogger *log.Logger) *CircularBufferLoggingManager {
return &CircularBufferLoggingManager{
fallbackLogger: fallbackLogger,
senderChanged: make(chan struct{}),
sendersChanged: make(chan struct{}),
}
}

Expand All @@ -61,28 +63,38 @@ func (manager *CircularBufferLoggingManager) ServiceLog(id string) runtime.LogHa
}
}

// SetSender implements runtime.LoggingManager interface.
func (manager *CircularBufferLoggingManager) SetSender(sender runtime.LogSender) runtime.LogSender {
manager.senderRW.Lock()
// SetSenders implements runtime.LoggingManager interface.
func (manager *CircularBufferLoggingManager) SetSenders(senders []runtime.LogSender) []runtime.LogSender {
manager.sendersRW.Lock()

prevChanged := manager.senderChanged
manager.senderChanged = make(chan struct{})
prevChanged := manager.sendersChanged
manager.sendersChanged = make(chan struct{})

prevSender := manager.sender
manager.sender = sender
prevSenders := manager.senders
manager.senders = senders

manager.senderRW.Unlock()
manager.sendersRW.Unlock()

close(prevChanged)

return prevSender
return prevSenders
}

func (manager *CircularBufferLoggingManager) getSender() (runtime.LogSender, <-chan struct{}) {
manager.senderRW.RLock()
defer manager.senderRW.RUnlock()
// getSenders waits for senders to be set and returns them.
func (manager *CircularBufferLoggingManager) getSenders() []runtime.LogSender {
for {
manager.sendersRW.RLock()

senders, changed := manager.senders, manager.sendersChanged

return manager.sender, manager.senderChanged
manager.sendersRW.RUnlock()

if len(senders) > 0 {
return senders
}

<-changed
}
}

func (manager *CircularBufferLoggingManager) getBuffer(id string, create bool) (*circular.Buffer, error) {
Expand Down Expand Up @@ -133,8 +145,8 @@ func (handler *circularHandler) Writer() (io.WriteCloser, error) {
}

go func() {
if err := handler.runSender(); err != nil {
handler.manager.fallbackLogger.Printf("log sender stopped: %s", err)
if err := handler.runSenders(); err != nil {
handler.manager.fallbackLogger.Printf("log senders stopped: %s", err)
}
}()
}
Expand Down Expand Up @@ -190,7 +202,7 @@ func (handler *circularHandler) Reader(opts ...runtime.LogOption) (io.ReadCloser
return r, nil
}

func (handler *circularHandler) runSender() error {
func (handler *circularHandler) runSenders() error {
r, err := handler.Reader(runtime.WithFollow())
if err != nil {
return err
Expand Down Expand Up @@ -222,34 +234,43 @@ func (handler *circularHandler) runSender() error {
// resend sends and resends given event until success or ErrDontRetry error.
func (handler *circularHandler) resend(e *runtime.LogEvent) {
for {
var sender runtime.LogSender
senders := handler.manager.getSenders()

// wait for sender to be set
for {
var changed <-chan struct{}
sendCtx, sendCancel := context.WithTimeout(context.TODO(), 5*time.Second)
sendErrors := make(chan error, len(senders))

sender, changed = handler.manager.getSender()
if sender != nil {
break
}
for _, sender := range senders {
sender := sender

<-changed
go func() {
sendErrors <- sender.Send(sendCtx, e)
}()
}

sendCtx, sendCancel := context.WithTimeout(context.TODO(), 5*time.Second)
var dontRetry bool

err := sender.Send(sendCtx, e)
for range senders {
err := <-sendErrors

sendCancel()
// don't retry if at least one sender succeed to avoid implementing per-sender queue, etc
if err == nil {
dontRetry = true

if err == nil {
return
continue
}

if debug.Enabled {
handler.manager.fallbackLogger.Print(err)
}

if errors.Is(err, runtime.ErrDontRetry) {
dontRetry = true
}
}

// TODO(aleksi): remove or make less noisy
handler.manager.fallbackLogger.Print(err)
sendCancel()

if errors.Is(err, runtime.ErrDontRetry) {
if dontRetry {
return
}

Expand Down
4 changes: 2 additions & 2 deletions internal/app/machined/pkg/runtime/logging/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (manager *FileLoggingManager) ServiceLog(id string) runtime.LogHandler {
}
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (manager *FileLoggingManager) SetSender(runtime.LogSender) runtime.LogSender {
// SetSenders implements runtime.LoggingManager interface (by doing nothing).
func (manager *FileLoggingManager) SetSenders([]runtime.LogSender) []runtime.LogSender {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/app/machined/pkg/runtime/logging/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (*NullLoggingManager) ServiceLog(id string) runtime.LogHandler {
return &nullLogHandler{}
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (*NullLoggingManager) SetSender(runtime.LogSender) runtime.LogSender {
// SetSenders implements runtime.LoggingManager interface (by doing nothing).
func (*NullLoggingManager) SetSenders([]runtime.LogSender) []runtime.LogSender {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,32 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"time"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

type jsonSender struct {
net string
addr string
type jsonLinesSender struct {
endpoint *url.URL

sema chan struct{}
conn net.Conn
}

// NewJSON returns log sender that sends logs in JSON over UDP, one message per packet.
func NewJSON(addr string) runtime.LogSender {
// NewJSONLines returns log sender that sends logs in JSON over TCP (newline-delimited)
// or UDP (one message per packet).
func NewJSONLines(endpoint *url.URL) runtime.LogSender {
sema := make(chan struct{}, 1)
sema <- struct{}{}

// It should be easy to add TCP support of that's requested.
net := "udp"

return &jsonSender{
net: net,
addr: addr,
sema: sema,
return &jsonLinesSender{
endpoint: endpoint,
sema: sema,
}
}

func (j jsonSender) tryLock(ctx context.Context) (unlock func()) {
func (j *jsonLinesSender) tryLock(ctx context.Context) (unlock func()) {
select {
case <-j.sema:
unlock = func() { j.sema <- struct{}{} }
Expand All @@ -48,7 +45,7 @@ func (j jsonSender) tryLock(ctx context.Context) (unlock func()) {
return
}

func (j *jsonSender) marshalJSON(e *runtime.LogEvent) ([]byte, error) {
func (j *jsonLinesSender) marshalJSON(e *runtime.LogEvent) ([]byte, error) {
m := make(map[string]interface{}, len(e.Fields)+3)
for k, v := range e.Fields {
m[k] = v
Expand All @@ -62,12 +59,16 @@ func (j *jsonSender) marshalJSON(e *runtime.LogEvent) ([]byte, error) {
}

// Send implements LogSender interface.
func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
func (j *jsonLinesSender) Send(ctx context.Context, e *runtime.LogEvent) error {
b, err := j.marshalJSON(e)
if err != nil {
return fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}

if j.endpoint.Scheme == "tcp" {
b = append(b, '\n')
}

unlock := j.tryLock(ctx)
if unlock == nil {
return ctx.Err()
Expand All @@ -77,7 +78,7 @@ func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {

// Connect (or "connect" for UDP) if no connection is established already.
if j.conn == nil {
conn, err := new(net.Dialer).DialContext(ctx, j.net, j.addr)
conn, err := new(net.Dialer).DialContext(ctx, j.endpoint.Scheme, j.endpoint.Host)
if err != nil {
return err
}
Expand All @@ -93,6 +94,7 @@ func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
j.conn.Close() //nolint:errcheck
j.conn = nil

// skip partially sent events to avoid partial duplicates in the receiver
if n > 0 {
err = fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}
Expand All @@ -104,7 +106,7 @@ func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
}

// Close implements LogSender interface.
func (j *jsonSender) Close(ctx context.Context) error {
func (j *jsonLinesSender) Close(ctx context.Context) error {
unlock := j.tryLock(ctx)
if unlock == nil {
return ctx.Err()
Expand Down
28 changes: 0 additions & 28 deletions internal/app/machined/pkg/runtime/logging/sender_null.go

This file was deleted.

11 changes: 6 additions & 5 deletions internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,22 @@ func (r *Runtime) CanApplyImmediate(b []byte) error {
}

// the config changes allowed to be applied immediately are:
// * cluster config
// * .machine.debug
// * .debug
// * .cluster
// * .machine.time
// * .machine.network
// * .machine.certCANs
// * .machine.network
// * .machine.sysctls
// FIXME(aleksi): logging
newConfig.ClusterConfig = currentConfig.ClusterConfig
// * .machine.logging
newConfig.ConfigDebug = currentConfig.ConfigDebug
newConfig.ClusterConfig = currentConfig.ClusterConfig

if newConfig.MachineConfig != nil && currentConfig.MachineConfig != nil {
newConfig.MachineConfig.MachineTime = currentConfig.MachineConfig.MachineTime
newConfig.MachineConfig.MachineCertSANs = currentConfig.MachineConfig.MachineCertSANs
newConfig.MachineConfig.MachineNetwork = currentConfig.MachineConfig.MachineNetwork
newConfig.MachineConfig.MachineSysctls = currentConfig.MachineConfig.MachineSysctls
newConfig.MachineConfig.MachineLogging = currentConfig.MachineConfig.MachineLogging
}

if !reflect.DeepEqual(currentConfig, newConfig) {
Expand Down

0 comments on commit 4c76865

Please sign in to comment.