Skip to content

Commit

Permalink
feat: initial support for JSON logging
Browse files Browse the repository at this point in the history
Hook into logging machinery.

Signed-off-by: Alexey Palazhchenko <alexey.palazhchenko@talos-systems.com>
  • Loading branch information
AlekSi committed Oct 16, 2021
1 parent 68c420e commit e60469a
Show file tree
Hide file tree
Showing 20 changed files with 341 additions and 50 deletions.
35 changes: 33 additions & 2 deletions internal/app/machined/pkg/runtime/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@

package runtime

import "io"
import (
"context"
"fmt"
"io"
"time"

"go.uber.org/zap/zapcore"
)

// LoggingManager provides unified interface to publish and consume logs.
type LoggingManager interface {
// ServiceLog privides a log handler for a given service (that may not exist).
ServiceLog(service string) LogHandler

// SetSender sets the log sender for all derived log handlers.
SetSender(sender LogSender)
}

// LogOptions for LogHandler.Reader.
Expand Down Expand Up @@ -38,8 +49,28 @@ func WithTailLines(lines int) LogOption {
}
}

// LogHandler provides interface to access particular log file.
// LogHandler provides interface to access particular log source.
type LogHandler interface {
Writer() (io.WriteCloser, error)
Reader(opt ...LogOption) (io.ReadCloser, error)
}

// LogEvent represents a log message to be send.
type LogEvent struct {
Msg string
Time time.Time
Level zapcore.Level
Fields map[string]interface{}
}

// ErrDontRetry indicates that log event should not be resent.
var ErrDontRetry = fmt.Errorf("don't retry")

// LogSender provides common interface for log senders.
type LogSender interface {
// Send tries to send the log event once, exiting on success, error, or context cancelation.
// Returned error is nil on success, non-nil otherwise.
// As a special case, Send can return (possibly wrapped) ErrDontRetry if the log event should not be resent
// (if it is invalid, if it was sent partially, etc).
Send(ctx context.Context, e *LogEvent) error
}
113 changes: 109 additions & 4 deletions internal/app/machined/pkg/runtime/logging/cicrular.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
package logging

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"strings"
"sync"
"time"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/circular"
Expand All @@ -26,22 +32,54 @@ const (

// CircularBufferLoggingManager implements logging to circular fixed size buffer.
type CircularBufferLoggingManager struct {
fallbackLogger *log.Logger

buffers sync.Map

senderRW sync.RWMutex
sender runtime.LogSender
senderChanged chan struct{}
}

// NewCircularBufferLoggingManager initializes new CircularBufferLoggingManager.
func NewCircularBufferLoggingManager() *CircularBufferLoggingManager {
return &CircularBufferLoggingManager{}
func NewCircularBufferLoggingManager(fallbackLogger *log.Logger) *CircularBufferLoggingManager {
return &CircularBufferLoggingManager{
fallbackLogger: fallbackLogger,
senderChanged: make(chan struct{}),
}
}

// ServiceLog implements runtime.LoggingManager interface.
func (manager *CircularBufferLoggingManager) ServiceLog(id string) runtime.LogHandler {
return &circularHandler{
manager: manager,
id: id,
fields: map[string]interface{}{
"component": id,
},
}
}

// SetSender implements runtime.LoggingManager interface.
func (manager *CircularBufferLoggingManager) SetSender(sender runtime.LogSender) {
manager.senderRW.Lock()

manager.sender = sender
prev := manager.senderChanged
manager.senderChanged = make(chan struct{})

manager.senderRW.Unlock()

close(prev)
}

func (manager *CircularBufferLoggingManager) getSender() (runtime.LogSender, <-chan struct{}) {
manager.senderRW.RLock()
defer manager.senderRW.RUnlock()

return manager.sender, manager.senderChanged
}

func (manager *CircularBufferLoggingManager) getBuffer(id string, create bool) (*circular.Buffer, error) {
buf, ok := manager.buffers.Load(id)
if !ok {
Expand All @@ -66,6 +104,7 @@ func (manager *CircularBufferLoggingManager) getBuffer(id string, create bool) (
type circularHandler struct {
manager *CircularBufferLoggingManager
id string
fields map[string]interface{}

buf *circular.Buffer
}
Expand All @@ -74,7 +113,7 @@ type nopCloser struct {
io.Writer
}

func (c nopCloser) Close() error {
func (nopCloser) Close() error {
return nil
}

Expand All @@ -84,10 +123,15 @@ func (handler *circularHandler) Writer() (io.WriteCloser, error) {
var err error

handler.buf, err = handler.manager.getBuffer(handler.id, true)

if err != nil {
return nil, err
}

go func() {
if err := handler.runSender(); err != nil {
handler.manager.fallbackLogger.Printf("log sender stopped: %s", err)
}
}()
}

return nopCloser{handler.buf}, nil
Expand Down Expand Up @@ -140,3 +184,64 @@ func (handler *circularHandler) Reader(opts ...runtime.LogOption) (io.ReadCloser

return r, nil
}

func (handler *circularHandler) runSender() error {
r, err := handler.Reader(runtime.WithFollow())
if err != nil {
return err
}
defer r.Close() //nolint:errcheck

scanner := bufio.NewScanner(r)
for scanner.Scan() {
l := strings.TrimSpace(scanner.Text())
if l == "" {
continue
}

handler.resend(&runtime.LogEvent{
Msg: l,
Fields: handler.fields,
})
}

return fmt.Errorf("scanner: %w", scanner.Err())
}

// resend sends and resends given event until success or ErrDontRetry error.
func (handler *circularHandler) resend(e *runtime.LogEvent) {
for {
var sender runtime.LogSender

// wait for sender to be set
for {
var changed <-chan struct{}

sender, changed = handler.manager.getSender()
if sender != nil {
break
}

handler.manager.fallbackLogger.Printf("waiting for sender at %s", time.Now())
<-changed
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)

err := sender.Send(ctx, e)

cancel()

if err == nil {
return
}

handler.manager.fallbackLogger.Print(err)

if errors.Is(err, runtime.ErrDontRetry) {
return
}

time.Sleep(time.Second)
}
}
3 changes: 3 additions & 0 deletions internal/app/machined/pkg/runtime/logging/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (manager *FileLoggingManager) ServiceLog(id string) runtime.LogHandler {
}
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (manager *FileLoggingManager) SetSender(runtime.LogSender) {}

type fileLogHandler struct {
path string

Expand Down
9 changes: 6 additions & 3 deletions internal/app/machined/pkg/runtime/logging/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ func NewNullLoggingManager() *NullLoggingManager {
}

// ServiceLog implements LoggingManager.
func (manager *NullLoggingManager) ServiceLog(id string) runtime.LogHandler {
func (*NullLoggingManager) ServiceLog(id string) runtime.LogHandler {
return &nullLogHandler{}
}

// SetSender implements runtime.LoggingManager interface (by doing nothing).
func (*NullLoggingManager) SetSender(runtime.LogSender) {}

type nullLogHandler struct{}

func (handler *nullLogHandler) Writer() (io.WriteCloser, error) {
func (*nullLogHandler) Writer() (io.WriteCloser, error) {
return os.OpenFile(os.DevNull, os.O_WRONLY, 0)
}

func (handler *nullLogHandler) Reader(...runtime.LogOption) (io.ReadCloser, error) {
func (*nullLogHandler) Reader(...runtime.LogOption) (io.ReadCloser, error) {
return os.OpenFile(os.DevNull, os.O_RDONLY, 0)
}
47 changes: 47 additions & 0 deletions internal/app/machined/pkg/runtime/logging/sender_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package logging

import (
"context"
"encoding/json"
"fmt"
"log"
"os"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

type jsonSender struct {
l *log.Logger
}

// NewJSON returns log sender that would eventually send logs in JSON.
// FIXME(aleksi): update comment.
func NewJSON() runtime.LogSender {
return &jsonSender{
l: log.New(os.Stdout, "JSON: ", 0),
}
}

func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
m := make(map[string]interface{}, len(e.Fields)+3)
m["msg"] = e.Msg
m["time"] = e.Time.Unix()
m["level"] = e.Level.String()

for k, v := range e.Fields {
m[k] = v
}

b, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}

j.l.Printf("%s\n", b)

return nil
}
22 changes: 22 additions & 0 deletions internal/app/machined/pkg/runtime/logging/sender_null.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package logging

import (
"context"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

type nullSender struct{}

// NewNull returns log sender that does nothing.
func NewNull() runtime.LogSender {
return nullSender{}
}

func (nullSender) Send(context.Context, *runtime.LogEvent) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewController() (*Controller, error) {
// TODO: this should be streaming capacity and probably some constant
e := NewEvents(1000, 10)

l := logging.NewCircularBufferLoggingManager()
l := logging.NewCircularBufferLoggingManager(log.New(os.Stdout, "machined fallback logger: ", log.Flags()))

ctlr := &Controller{
r: NewRuntime(nil, s, e, l),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (r *Runtime) CanApplyImmediate(b []byte) error {
// * .machine.network
// * .machine.certCANs
// * .machine.sysctls
// FIXME(aleksi): logging
newConfig.ClusterConfig = currentConfig.ClusterConfig
newConfig.ConfigDebug = currentConfig.ConfigDebug

Expand Down

0 comments on commit e60469a

Please sign in to comment.