Skip to content

Commit

Permalink
fix plugin logger forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Janelle Law committed Nov 2, 2023
1 parent b90a3a0 commit 7b9df91
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 121 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/v2/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func New(ctx context.Context, conf *v1beta1.AgentConfig, opts ...AgentOption) (*
if conf.Spec.LogLevel != "" {
level = logger.ParseLevel(conf.Spec.LogLevel)
}
lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(logger.WriteOnlyFile(id))).WithGroup("agent")
fileWriter := logger.InitPluginWriter(id)
lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(fileWriter)).WithGroup("agent")
lg.Debug(fmt.Sprintf("using log level: %s", level.String()))

var pl *plugins.PluginLoader
Expand Down
8 changes: 4 additions & 4 deletions pkg/logger/color_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,22 @@ func (h *colorHandler) appendLevel(buf *buffer, level slog.Level) {
switch {
case level < slog.LevelInfo:
buf.WriteStringIf(h.colorEnabled, ansiBrightMagenta)
buf.WriteString("DEBUG")
buf.WriteString(levelString[0])
appendLevelDelta(buf, level-slog.LevelDebug)
buf.WriteStringIf(h.colorEnabled, ansiReset)
case level < slog.LevelWarn:
buf.WriteStringIf(h.colorEnabled, ansiBrightBlue)
buf.WriteString("INFO")
buf.WriteString(levelString[1])
appendLevelDelta(buf, level-slog.LevelInfo)
buf.WriteStringIf(h.colorEnabled, ansiReset)
case level < slog.LevelError:
buf.WriteStringIf(h.colorEnabled, ansiBrightYellow)
buf.WriteString("WARN")
buf.WriteString(levelString[2])
appendLevelDelta(buf, level-slog.LevelWarn)
buf.WriteStringIf(h.colorEnabled, ansiReset)
default:
buf.WriteStringIf(h.colorEnabled, ansiBrightRed)
buf.WriteString("ERROR")
buf.WriteString(levelString[3])
appendLevelDelta(buf, level-slog.LevelError)
buf.WriteStringIf(h.colorEnabled, ansiReset)
}
Expand Down
32 changes: 14 additions & 18 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ import (
"io"
"log/slog"
"os"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/slogr"
"github.com/kralicky/gpkg/sync"
gpkgsync "github.com/kralicky/gpkg/sync"
slogmulti "github.com/samber/slog-multi"
slogsampling "github.com/samber/slog-sampling"
"github.com/spf13/afero"
)

const (
pluginGroupPrefix = "plugin"
NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling
errKey = "err"
)

var (
asciiLogo = ` _
____ ____ ____ (_)
Expand All @@ -28,15 +35,15 @@ var (
DefaultLogLevel = slog.LevelDebug
DefaultWriter io.Writer
DefaultAddSource = true
pluginGroupPrefix = "plugin"
NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling
logFs afero.Fs
DefaultTimeFormat = "2006 Jan 02 15:04:05"
errKey = "err"
logSampler = &sampler{}
PluginFileWriter = &remoteFileWriter{
mu: &sync.Mutex{},
}
levelString = []string{"DEBUG", "INFO", "WARN", "ERROR"}
)

var logSampler = &sampler{}

func init() {
logFs = afero.NewMemMapFs()
}
Expand Down Expand Up @@ -190,7 +197,6 @@ func colorHandlerWithOptions(opts ...LoggerOption) slog.Handler {

if options.FileWriter != nil {
logFileHandler := newProtoHandler(options.FileWriter, ConfigureProtoOptions(options))

// distribute logs to handlers in parallel
return slogmulti.Fanout(handler, logFileHandler)
}
Expand Down Expand Up @@ -218,13 +224,11 @@ func NewNop() *slog.Logger {
}

func NewPluginLogger(opts ...LoggerOption) *slog.Logger {
opts = append(opts, WithFileWriter(sharedPluginWriter))

return New(opts...).WithGroup(pluginGroupPrefix)
}

type sampler struct {
dropped sync.Map[string, uint64]
dropped gpkgsync.Map[string, uint64]
}

func (s *sampler) onDroppedHook(_ context.Context, r slog.Record) {
Expand All @@ -248,11 +252,3 @@ func WriteOnlyFile(clusterID string) afero.File {
}
return f
}

func GetFileIfExists(clusterID string) afero.File {
f, err := logFs.Open(clusterID)
if err != nil {
return nil
}
return f
}
97 changes: 97 additions & 0 deletions pkg/logger/plugin_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package logger

import (
"io"
"log/slog"
"regexp"
"strings"
"sync"
)

const (
numGoPluginSegments = 4 // hclog timestamp + hclog level + go-plugin name + log message
logDelimiter = " "
)

var (
logLevelPattern = `\(` + strings.Join(levelString, "|") + `\)`
pluginNamePattern = pluginGroupPrefix + `\.\w+`
)

// forwards plugin logs to their hosts. Retrieve these logs with the "debug" cli
type remoteFileWriter struct {
logForwarder *slog.Logger
w io.Writer
mu *sync.Mutex
}

func InitPluginWriter(agentId string) io.Writer {
PluginFileWriter.mu.Lock()
defer PluginFileWriter.mu.Unlock()

if PluginFileWriter.w != nil {
return PluginFileWriter.w
}

f := WriteOnlyFile(agentId)
writer := f.(io.Writer)
PluginFileWriter.w = writer
PluginFileWriter.logForwarder = newPluginLogForwarder(writer).WithGroup("forwarded")
return writer
}

func (f remoteFileWriter) Write(b []byte) (int, error) {
if f.w == nil || f.logForwarder == nil {
return 0, nil
}

// workaround to remove hclog's default DEBUG prefix on all non-hclog logs
prefixes := strings.SplitN(string(b), logDelimiter, numGoPluginSegments)
if len(prefixes) != numGoPluginSegments {
f.logForwarder.Info(string(b))
return len(b), nil
}

forwardedLog := prefixes[numGoPluginSegments-1]

levelPattern := regexp.MustCompile(logLevelPattern)
level := levelPattern.FindString(forwardedLog)

namePattern := regexp.MustCompile(pluginNamePattern)
loggerName := namePattern.FindString(forwardedLog)

namedLogger := f.logForwarder.WithGroup(loggerName)

switch level {
case levelString[0]:
namedLogger.Debug(forwardedLog)
case levelString[1]:
namedLogger.Info(forwardedLog)
case levelString[2]:
namedLogger.Warn(forwardedLog)
case levelString[3]:
namedLogger.Error(forwardedLog)
default:
namedLogger.Debug(forwardedLog)
}

return len(b), nil
}

func newPluginLogForwarder(w io.Writer) *slog.Logger {
dropForwardedPrefixes := func(groups []string, a slog.Attr) slog.Attr {
switch a.Key {
case slog.SourceKey:
return slog.Attr{}
}
return a
}

options := &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelDebug,
ReplaceAttr: dropForwardedPrefixes,
}

return slog.New(newProtoHandler(w, options))
}
44 changes: 0 additions & 44 deletions pkg/logger/plugin_writer.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/logger/remotelogs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (ls *LogServer) StreamLogs(req *controlv1.LogStreamRequest, server controlv
return nil
}
if err != nil {
return err
ls.logger.Warn("malformed log record, skipping", logger.Err(err))
continue
}

if minLevel != nil && logger.ParseLevel(msg.Level) < slog.Level(*minLevel) {
Expand Down Expand Up @@ -123,7 +124,6 @@ func (ls *LogServer) getLogMessage(f afero.File) (*controlv1.StructuredLogRecord
}

if err := proto.Unmarshal(recordBytes, record); err != nil {
ls.logger.Error("failed to unmarshal record bytes")
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/opni/commands/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func parseTimeOrDie(timeStr string) time.Time {
}
t, err := when.EN.Parse(timeStr, time.Now())
if err != nil || t == nil {
lg.Error("could not interpret start time")
lg.Error("could not interpret start/end time")
os.Exit(1)
}
return t.Time
Expand Down
7 changes: 4 additions & 3 deletions pkg/plugins/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rancher/opni/pkg/auth/cluster"
"github.com/rancher/opni/pkg/auth/session"
"github.com/rancher/opni/pkg/caching"
"github.com/rancher/opni/pkg/logger"
"github.com/rancher/opni/pkg/plugins/meta"
"github.com/rancher/opni/pkg/util/streams"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -51,7 +52,8 @@ func ClientConfig(md meta.PluginMeta, scheme meta.Scheme, opts ...ClientOption)
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Managed: true,
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.Error,
Level: hclog.Debug,
Output: logger.PluginFileWriter,
}),
GRPCDialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
Expand All @@ -61,8 +63,7 @@ func ClientConfig(md meta.PluginMeta, scheme meta.Scheme, opts ...ClientOption)
grpc.WithPerRPCCredentials(cluster.ClusterIDKey),
grpc.WithPerRPCCredentials(session.AttributesKey),
},
SyncStderr: os.Stderr,
Stderr: os.Stderr,
Stderr: os.Stderr,
}

if options.reattach != nil {
Expand Down
12 changes: 0 additions & 12 deletions plugins/alerting/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package agent
import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/alerting/pkg/apis/node"
"github.com/rancher/opni/plugins/alerting/pkg/apis/rules"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -26,8 +24,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
identityClient := controlv1.NewIdentityClient(cc)
ruleSyncClient := rules.NewRuleSyncClient(cc)

p.configureLoggers(identityClient)

p.node.SetClients(
healthListenerClient,
nodeClient,
Expand All @@ -36,11 +32,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {

p.ruleStreamer.SetClients(ruleSyncClient)
}

func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) {
id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.lg.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}
13 changes: 0 additions & 13 deletions plugins/logging/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package agent

import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/logging/apis/node"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -25,17 +22,7 @@ func (p *Plugin) StreamServers() []streamext.Server {
}

func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
p.configureLoggers(cc)
nodeClient := node.NewNodeLoggingCapabilityClient(cc)
p.node.SetClient(nodeClient)
p.otelForwarder.SetClient(cc)
}

func (p *Plugin) configureLoggers(cc grpc.ClientConnInterface) {
identityClient := controlv1.NewIdentityClient(cc)
id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.logger.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}
Loading

0 comments on commit 7b9df91

Please sign in to comment.