Skip to content

Commit

Permalink
feat: kernel log (kmsg) delivery controller
Browse files Browse the repository at this point in the history
This controller is enabled when `talos.logging.kernel=` kernel arg is
passed. It will try to send logs to the endpoint as JSON-over-TCP (or
UDP-over-TCP, depends on the endpoint) as structured messages.

Example (from test implementation):

```
2021-11-26T19:53:21.912+0300	INFO	siderolink-agent/log_receiver.go:23	kernel log message	{"src_address": "fdae:41e4:649b:9303:680a:dfab:f7fa:ea00", "msg": {"clock":6252819,"facility":"user","msg":"[talos] task startAllServices (1/1): waiting for 6 services\n","priority":"warning","seq":711,"talos-level":"warn","talos-time":"2021-11-26T16:53:21.3258698Z"}}
```

Fixes #4455

See also siderolabs/siderolink#4

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Nov 29, 2021
1 parent f314978 commit bf4c81e
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/talos-systems/go-smbios v0.1.1-0.20211122130416-fd5ec8ce4873
github.com/talos-systems/grpc-proxy v0.2.0
github.com/talos-systems/net v0.3.1-0.20211112122313-0abe5bdae8f8
github.com/talos-systems/siderolink v0.0.0-20211119180852-0755b24d4682
github.com/talos-systems/siderolink v0.0.0-20211125204118-d86cdd59ee7a
github.com/talos-systems/talos/pkg/machinery v0.14.0-alpha.1.0.20211118180932-1ffa8e048008
github.com/u-root/u-root v7.0.0+incompatible
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,8 @@ github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1L
github.com/talos-systems/grpc-proxy v0.2.0/go.mod h1:sm97Vc/z2cok3pu6ruNeszQej4KDxFrDgfWs4C1mtC4=
github.com/talos-systems/net v0.3.1-0.20211112122313-0abe5bdae8f8 h1:oT2MASZ8V3DuZbhaJWJ8oZ373zfmgXpvw2xLHM5cOYk=
github.com/talos-systems/net v0.3.1-0.20211112122313-0abe5bdae8f8/go.mod h1:zhcGixNJz9dgwFiUwc7gkkAqdVqXagU1SNNoIVXYKGo=
github.com/talos-systems/siderolink v0.0.0-20211119180852-0755b24d4682 h1:0rXuO5pwwqXnNR/q33iB8vaAaSPNLKbS/d9PwkCFQkE=
github.com/talos-systems/siderolink v0.0.0-20211119180852-0755b24d4682/go.mod h1:e3PMKivoq1tpggh8esTxLQrKiZvPgPe060/ukWcJOJw=
github.com/talos-systems/siderolink v0.0.0-20211125204118-d86cdd59ee7a h1:IFPuVRfl3/Mf+eh3Hqsro1Wsb2Ng31jnNlWRm7OeC9U=
github.com/talos-systems/siderolink v0.0.0-20211125204118-d86cdd59ee7a/go.mod h1:bEGwDYl9QgC3oZ4kdnJTuR2HX/XlUhxZjx/QAakKuBc=
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
6 changes: 5 additions & 1 deletion internal/app/machined/internal/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

// RunInstallerContainer performs an installation via the installer container.
//
//nolint:gocyclo
//nolint:gocyclo,cyclop
func RunInstallerContainer(disk, platform, ref string, configBytes []byte, reg config.Registries, opts ...Option) error {
options := DefaultInstallOptions()

Expand Down Expand Up @@ -113,6 +113,10 @@ func RunInstallerContainer(disk, platform, ref string, configBytes []byte, reg c
args = append(args, "--extra-kernel-arg", fmt.Sprintf("%s=%s", constants.KernelParamEventsSink, *c))
}

if c := procfs.ProcCmdline().Get(constants.KernelParamLoggingKernel).First(); c != nil {
args = append(args, "--extra-kernel-arg", fmt.Sprintf("%s=%s", constants.KernelParamLoggingKernel, *c))
}

specOpts := []oci.SpecOpts{
oci.WithImageConfig(img),
oci.WithProcessArgs(args...),
Expand Down
182 changes: 182 additions & 0 deletions internal/app/machined/pkg/controllers/runtime/kmsg_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime

import (
"context"
"fmt"
"net/url"
"time"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/talos-systems/go-kmsg"
"github.com/talos-systems/go-procfs/procfs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime/logging"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/network"
)

const drainTimeout = 100 * time.Millisecond

// KmsgLogDeliveryController watches events and forwards them to the events sink server
// if it's configured.
type KmsgLogDeliveryController struct {
Cmdline *procfs.Cmdline
Drainer *runtime.Drainer

drainSub *runtime.DrainSubscription
}

// Name implements controller.Controller interface.
func (ctrl *KmsgLogDeliveryController) Name() string {
return "runtime.KmsgLogDeliveryController"
}

// Inputs implements controller.Controller interface.
func (ctrl *KmsgLogDeliveryController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: network.NamespaceName,
Type: network.StatusType,
ID: pointer.ToString(network.StatusID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *KmsgLogDeliveryController) Outputs() []controller.Output {
return nil
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *KmsgLogDeliveryController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) (err error) {
if ctrl.Cmdline == nil || ctrl.Cmdline.Get(constants.KernelParamLoggingKernel).First() == nil {
return nil
}

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

var netStatus resource.Resource

netStatus, err = r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
// no network state yet
continue
}

return fmt.Errorf("error reading network status: %w", err)
}

if !netStatus.(*network.Status).TypedSpec().AddressReady {
// wait for address
continue
}

break
}

if ctrl.drainSub == nil {
ctrl.drainSub = ctrl.Drainer.Subscribe()
}

destURL, err := url.Parse(*ctrl.Cmdline.Get(constants.KernelParamLoggingKernel).First())
if err != nil {
return fmt.Errorf("error parsing %q: %w", constants.KernelParamLoggingKernel, err)
}

sender := logging.NewJSONLines(destURL)
defer sender.Close(ctx) //nolint:errcheck

reader, err := kmsg.NewReader(kmsg.Follow())
if err != nil {
return fmt.Errorf("error reading kernel messages: %w", err)
}

defer reader.Close() //nolint:errcheck

kmsgCh := reader.Scan(ctx)

var (
drainTimer *time.Timer
drainTimerCh <-chan time.Time
)

for {
var msg kmsg.Packet

select {
case <-ctx.Done():
ctrl.drainSub.Cancel()

return nil
case msg = <-kmsgCh:
if drainTimer != nil {
// if draining, reset the timer as there's a new message
if !drainTimer.Stop() {
<-drainTimer.C
}

drainTimer.Reset(drainTimeout)
}
case <-ctrl.drainSub.EventCh():
// drain started, assume that ksmg is drained if there're no new messages in drainTimeout
drainTimer = time.NewTimer(drainTimeout)
drainTimerCh = drainTimer.C
case <-drainTimerCh:
ctrl.drainSub.Cancel()

return nil
}

if msg.Err != nil {
return fmt.Errorf("error receiving kernel logs: %w", msg.Err)
}

if err = sender.Send(ctx, &runtime.LogEvent{
Msg: msg.Message.Message,
Time: msg.Message.Timestamp,
Level: kmsgPriorityToLevel(msg.Message.Priority),
Fields: map[string]interface{}{
"facility": msg.Message.Facility.String(),
"seq": msg.Message.SequenceNumber,
"clock": msg.Message.Clock,
"priority": msg.Message.Priority.String(),
},
}); err != nil {
return fmt.Errorf("error sending logs: %w", err)
}
}
}

func kmsgPriorityToLevel(pri kmsg.Priority) zapcore.Level {
switch pri {
case kmsg.Alert, kmsg.Crit, kmsg.Emerg, kmsg.Err:
return zapcore.ErrorLevel
case kmsg.Debug:
return zapcore.DebugLevel
case kmsg.Info, kmsg.Notice:
return zapcore.InfoLevel
case kmsg.Warning:
return zapcore.WarnLevel
default:
return zapcore.ErrorLevel
}
}
158 changes: 158 additions & 0 deletions internal/app/machined/pkg/controllers/runtime/kmsg_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime_test

import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-procfs/procfs"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/siderolink/pkg/logreceiver"
"inet.af/netaddr"

controllerruntime "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/runtime"
talosruntime "github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/logging"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/network"
)

type logHandler struct {
mu sync.Mutex
count int
}

// HandleLog implements logreceiver.Handler.
func (s *logHandler) HandleLog(srcAddr netaddr.IP, msg map[string]interface{}) {
s.mu.Lock()
defer s.mu.Unlock()

s.count++
}

func (s *logHandler) getCount() int {
s.mu.Lock()
defer s.mu.Unlock()

return s.count
}

type KmsgLogDeliverySuite struct {
suite.Suite

state state.State
cmdline *procfs.Cmdline

runtime *runtime.Runtime
drainer *talosruntime.Drainer
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc

handler *logHandler

srv *logreceiver.Server
}

func (suite *KmsgLogDeliverySuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)

suite.state = state.WrapCore(namespaced.NewState(inmem.Build))

logger := logging.Wrap(log.Writer())

var err error

suite.runtime, err = runtime.NewRuntime(suite.state, logger)
suite.Require().NoError(err)

suite.handler = &logHandler{}

suite.srv, err = logreceiver.NewServer(logger, "localhost:4001", suite.handler.HandleLog)
suite.Require().NoError(err)

go func() {
suite.srv.Serve() //nolint:errcheck
}()

suite.cmdline = procfs.NewCmdline(fmt.Sprintf("%s=%s", constants.KernelParamLoggingKernel, "tcp://localhost:4001"))
suite.drainer = talosruntime.NewDrainer()

suite.Require().NoError(suite.runtime.RegisterController(&controllerruntime.KmsgLogDeliveryController{
Cmdline: suite.cmdline,
Drainer: suite.drainer,
}))

status := network.NewStatus(network.NamespaceName, network.StatusID)
status.TypedSpec().AddressReady = true

suite.Require().NoError(suite.state.Create(suite.ctx, status))
}

func (suite *KmsgLogDeliverySuite) startRuntime() {
suite.wg.Add(1)

go func() {
defer suite.wg.Done()

suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
}

func (suite *KmsgLogDeliverySuite) TestDelivery() {
suite.startRuntime()

// controller should deliver some kernel logs from host's kmsg buffer
err := retry.Constant(time.Second*5, retry.WithUnits(time.Millisecond*100)).Retry(func() error {
if suite.handler.getCount() == 0 {
return retry.ExpectedErrorf("no logs received")
}

return nil
})
suite.Require().NoError(err)
}

func (suite *KmsgLogDeliverySuite) TestDrain() {
suite.startRuntime()

// wait for controller to start delivering some logs
err := retry.Constant(time.Second*5, retry.WithUnits(time.Millisecond*100)).Retry(func() error {
if suite.handler.getCount() == 0 {
return retry.ExpectedErrorf("no logs received")
}

return nil
})
suite.Require().NoError(err)

// drain should be successful, i.e. controller should stop on its own before context is canceled
suite.Assert().NoError(suite.drainer.Drain(suite.ctx))
}

func (suite *KmsgLogDeliverySuite) TearDownTest() {
suite.T().Log("tear down")

suite.srv.Stop()

suite.ctxCancel()

suite.wg.Wait()
}

func TestKmsgLogDeliverySuite(t *testing.T) {
suite.Run(t, new(KmsgLogDeliverySuite))
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(),
},
&runtimecontrollers.KernelParamSpecController{},
&runtimecontrollers.KmsgLogDeliveryController{
Cmdline: procfs.ProcCmdline(),
Drainer: drainer,
},
&secrets.APIController{},
&secrets.APICertSANsController{},
&secrets.EtcdController{},
Expand Down

0 comments on commit bf4c81e

Please sign in to comment.