Skip to content

Commit

Permalink
feat: extract JSON fields from log lines
Browse files Browse the repository at this point in the history
Confirmed to work with Elastic Filebeat.

Signed-off-by: Alexey Palazhchenko <alexey.palazhchenko@talos-systems.com>
  • Loading branch information
AlekSi committed Oct 20, 2021
1 parent e77d81f commit d32814e
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 19 deletions.
4 changes: 4 additions & 0 deletions hack/containerd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,9 @@ disabled_plugins = ["io.containerd.snapshotter.v1.aufs", "io.containerd.v1.zfs",

imports = ["/var/cri/conf.d/*.toml"]

[debug]
level = "info"
format = "json"

[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc]
runtime_type = "io.containerd.runc.v2"
22 changes: 13 additions & 9 deletions internal/app/machined/pkg/runtime/logging/cicrular.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package logging

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,7 +55,8 @@ func (manager *CircularBufferLoggingManager) ServiceLog(id string) runtime.LogHa
manager: manager,
id: id,
fields: map[string]interface{}{
"component": id,
// use field name that is not used by anything else
"talos-service": id,
},
}
}
Expand Down Expand Up @@ -198,15 +199,18 @@ func (handler *circularHandler) runSender() error {

scanner := bufio.NewScanner(r)
for scanner.Scan() {
l := strings.TrimSpace(scanner.Text())
if l == "" {
l := bytes.TrimSpace(scanner.Bytes())
if len(l) == 0 {
continue
}

// TODO(aleksi): extract fields from msg there or in jsonSender
e := &runtime.LogEvent{
Msg: l,
Fields: handler.fields,
e := parseLogLine(l, time.Now())
if e.Fields == nil {
e.Fields = handler.fields
} else {
for k, v := range handler.fields {
e.Fields[k] = v
}
}

handler.resend(e)
Expand All @@ -229,7 +233,6 @@ func (handler *circularHandler) resend(e *runtime.LogEvent) {
break
}

handler.manager.fallbackLogger.Printf("waiting for sender at %s", time.Now())
<-changed
}

Expand All @@ -243,6 +246,7 @@ func (handler *circularHandler) resend(e *runtime.LogEvent) {
return
}

// TODO(aleksi): remove or make less noisy
handler.manager.fallbackLogger.Print(err)

if errors.Is(err, runtime.ErrDontRetry) {
Expand Down
67 changes: 67 additions & 0 deletions internal/app/machined/pkg/runtime/logging/extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package logging

import (
"encoding/json"
"strings"
"time"

"go.uber.org/zap/zapcore"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

func parseLogLine(l []byte, now time.Time) *runtime.LogEvent {
e := &runtime.LogEvent{
Msg: string(l),
Time: now,
Level: zapcore.InfoLevel,
}

var m map[string]interface{}
if err := json.Unmarshal(l, &m); err != nil {
return e
}

if msgS, ok := m["msg"].(string); ok {
e.Msg = strings.TrimSpace(msgS)

delete(m, "msg")
}

for _, k := range []string{"time", "ts"} {
if timeS, ok := m[k].(string); ok {
t, err := time.Parse(time.RFC3339Nano, timeS)
if err == nil {
e.Time = t

delete(m, k)

break
}
}
}

if levelS, ok := m["level"].(string); ok {
levelS = strings.ToLower(levelS)

// convert containerd's logrus' level to zap's level
if levelS == "warning" {
levelS = "warn"
}

var level zapcore.Level
if err := level.UnmarshalText([]byte(levelS)); err == nil {
e.Level = level

delete(m, "level")
}
}

e.Fields = m

return e
}
65 changes: 65 additions & 0 deletions internal/app/machined/pkg/runtime/logging/extract_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package logging //nolint:testpackage

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)

func TestParseLogLine(t *testing.T) {
t.Parallel()

now := time.Date(2021, 10, 19, 12, 42, 37, 123456789, time.UTC)

for name, tc := range map[string]struct {
l string
expected *runtime.LogEvent
}{
"machined": {
l: `[talos] task updateBootloader (1/1): done, 219.885384ms`,
expected: &runtime.LogEvent{
Msg: `[talos] task updateBootloader (1/1): done, 219.885384ms`,
Time: now,
Level: zapcore.InfoLevel,
},
},
"etcd-zap": {
l: `{"level":"info","ts":"2021-10-19T14:53:05.815Z","caller":"mvcc/kvstore_compaction.go:57","msg":"finished scheduled compaction","compact-revision":34567,"took":"21.041639ms"}`,
expected: &runtime.LogEvent{
Msg: `finished scheduled compaction`,
Time: time.Date(2021, 10, 19, 14, 53, 5, 815000000, time.UTC),
Level: zapcore.InfoLevel,
Fields: map[string]interface{}{
"caller": "mvcc/kvstore_compaction.go:57",
"compact-revision": float64(34567),
"took": "21.041639ms",
},
},
},
"cri-logrus": {
l: `{"level":"warning","msg":"cleanup warnings time=\"2021-10-19T14:52:20Z\" level=info msg=\"starting signal loop\" namespace=k8s.io pid=2629\n","time":"2021-10-19T14:52:20.578858689Z"}`,
expected: &runtime.LogEvent{
Msg: `cleanup warnings time="2021-10-19T14:52:20Z" level=info msg="starting signal loop" namespace=k8s.io pid=2629`,
Time: time.Date(2021, 10, 19, 14, 52, 20, 578858689, time.UTC),
Level: zapcore.WarnLevel,
Fields: map[string]interface{}{},
},
},
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
t.Parallel()

actual := parseLogLine([]byte(tc.l), now)
assert.Equal(t, tc.expected, actual)
})
}
}
22 changes: 12 additions & 10 deletions internal/app/machined/pkg/runtime/logging/sender_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"net"
"time"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)
Expand Down Expand Up @@ -47,21 +48,22 @@ func (j jsonSender) tryLock(ctx context.Context) (unlock func()) {
return
}

// Send implements LogSender interface.
func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
func (j *jsonSender) marshalJSON(e *runtime.LogEvent) ([]byte, error) {
m := make(map[string]interface{}, len(e.Fields)+3)

// TODO(aleksi): extract fields from msg there or in circularHandler

m["msg"] = e.Msg
m["time"] = e.Time.Unix()
m["level"] = e.Level.String()

for k, v := range e.Fields {
m[k] = v
}

b, err := json.Marshal(m)
m["msg"] = e.Msg
m["talos-time"] = e.Time.Format(time.RFC3339Nano)
m["talos-level"] = e.Level.String()

return json.Marshal(m)
}

// Send implements LogSender interface.
func (j *jsonSender) Send(ctx context.Context, e *runtime.LogEvent) error {
b, err := j.marshalJSON(e)
if err != nil {
return fmt.Errorf("%w: %s", runtime.ErrDontRetry, err)
}
Expand Down

0 comments on commit d32814e

Please sign in to comment.