From 82e1abfe548e691c17ea6ca1a8a2f1683b39a044 Mon Sep 17 00:00:00 2001 From: Janelle Law Date: Wed, 18 Oct 2023 18:12:03 +0000 Subject: [PATCH] clean up plugin logger initialization --- pkg/agent/v2/agent.go | 3 +- pkg/logger/file_writer.go | 39 ++++++++++++++++ pkg/logger/logger.go | 38 ++++++++-------- pkg/logger/plugin_writer.go | 44 ------------------- pkg/opni/commands/admin.go | 2 +- plugins/alerting/pkg/agent/stream.go | 12 ----- plugins/logging/pkg/agent/stream.go | 13 ------ plugins/metrics/pkg/agent/stream.go | 12 ----- plugins/topology/pkg/topology/agent/stream.go | 11 ----- 9 files changed, 60 insertions(+), 114 deletions(-) create mode 100644 pkg/logger/file_writer.go delete mode 100644 pkg/logger/plugin_writer.go diff --git a/pkg/agent/v2/agent.go b/pkg/agent/v2/agent.go index f35fda154b..7945ea3115 100644 --- a/pkg/agent/v2/agent.go +++ b/pkg/agent/v2/agent.go @@ -137,7 +137,8 @@ func New(ctx context.Context, conf *v1beta1.AgentConfig, opts ...AgentOption) (* if conf.Spec.LogLevel != "" { level = logger.ParseLevel(conf.Spec.LogLevel) } - lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(logger.WriteOnlyFile(id))).WithGroup("agent") + fileWriter := logger.InitPluginWriter(id) + lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(fileWriter)).WithGroup("agent") lg.Debug("using log level:", "level", level.String()) var pl *plugins.PluginLoader diff --git a/pkg/logger/file_writer.go b/pkg/logger/file_writer.go new file mode 100644 index 0000000000..a80d06e052 --- /dev/null +++ b/pkg/logger/file_writer.go @@ -0,0 +1,39 @@ +package logger + +import ( + "io" + "sync" +) + +type fileWriter struct { + w io.Writer + mu *sync.Mutex +} + +func InitPluginWriter(agentId string) io.Writer { + sharedPluginWriter.mu.Lock() + defer sharedPluginWriter.mu.Unlock() + if sharedPluginWriter.w != nil { + return sharedPluginWriter.w + } + + f := WriteOnlyFile(agentId) + writer := f.(io.Writer) + sharedPluginWriter.w = writer + return writer +} + +func (fw fileWriter) Write(b []byte) (int, error) { + fw.mu.Lock() + defer fw.mu.Unlock() + if fw.w == nil { + return 0, nil + } + + n, err := fw.w.Write(b) + if err != nil { + return n, err + } + + return n, nil +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 4b5c16ed42..d10ca0f2d9 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -6,16 +6,23 @@ import ( "io" "log/slog" "os" + "sync" "time" "github.com/go-logr/logr" "github.com/go-logr/logr/slogr" - "github.com/kralicky/gpkg/sync" + gpkgsync "github.com/kralicky/gpkg/sync" slogmulti "github.com/samber/slog-multi" slogsampling "github.com/samber/slog-sampling" "github.com/spf13/afero" ) +const ( + pluginGroupPrefix = "plugin" + NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling + errKey = "err" +) + var ( asciiLogo = ` _ ____ ____ ____ (_) @@ -25,18 +32,17 @@ var ( /_/ Observability + AIOps for Kubernetes ` - DefaultLogLevel = slog.LevelDebug - DefaultWriter io.Writer - DefaultAddSource = true - pluginGroupPrefix = "plugin" - NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling - logFs afero.Fs - DefaultTimeFormat = "2006 Jan 02 15:04:05" - errKey = "err" + DefaultLogLevel = slog.LevelDebug + DefaultWriter io.Writer + DefaultAddSource = true + logFs afero.Fs + DefaultTimeFormat = "2006 Jan 02 15:04:05" + logSampler = &sampler{} + sharedPluginWriter = &fileWriter{ + mu: &sync.Mutex{}, + } ) -var logSampler = &sampler{} - func init() { logFs = afero.NewMemMapFs() } @@ -225,7 +231,7 @@ func NewPluginLogger(opts ...LoggerOption) *slog.Logger { } type sampler struct { - dropped sync.Map[string, uint64] + dropped gpkgsync.Map[string, uint64] } func (s *sampler) onDroppedHook(_ context.Context, r slog.Record) { @@ -249,11 +255,3 @@ func WriteOnlyFile(clusterID string) afero.File { } return f } - -func GetFileIfExists(clusterID string) afero.File { - f, err := logFs.Open(clusterID) - if err != nil { - return nil - } - return f -} diff --git a/pkg/logger/plugin_writer.go b/pkg/logger/plugin_writer.go deleted file mode 100644 index 597df3bf85..0000000000 --- a/pkg/logger/plugin_writer.go +++ /dev/null @@ -1,44 +0,0 @@ -package logger - -import ( - "io" - "sync" -) - -var sharedPluginWriter = &pluginWriter{ - mu: &sync.Mutex{}, -} - -type pluginWriter struct { - w *io.Writer - mu *sync.Mutex -} - -func InitPluginWriter(agentId string) { - sharedPluginWriter.mu.Lock() - defer sharedPluginWriter.mu.Unlock() - if sharedPluginWriter.w != nil { - return - } - f := GetFileIfExists(agentId) - if f == nil { - f = WriteOnlyFile(agentId) - } - fileWriter := f.(io.Writer) - sharedPluginWriter.w = &fileWriter -} - -func (pw pluginWriter) Write(b []byte) (int, error) { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.w == nil { - return 0, nil - } - - n, err := (*pw.w).Write(b) - if err != nil { - return n, err - } - - return n, nil -} diff --git a/pkg/opni/commands/admin.go b/pkg/opni/commands/admin.go index 93a79b7741..36969ae361 100644 --- a/pkg/opni/commands/admin.go +++ b/pkg/opni/commands/admin.go @@ -313,7 +313,7 @@ func parseTimeOrDie(timeStr string) time.Time { } t, err := when.EN.Parse(timeStr, time.Now()) if err != nil || t == nil { - lg.Error("could not interpret start time") + lg.Error("could not interpret start/end time") os.Exit(1) } return t.Time diff --git a/plugins/alerting/pkg/agent/stream.go b/plugins/alerting/pkg/agent/stream.go index e9f91017d1..10e835b77a 100644 --- a/plugins/alerting/pkg/agent/stream.go +++ b/plugins/alerting/pkg/agent/stream.go @@ -3,12 +3,10 @@ package agent import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/alerting/pkg/apis/node" "github.com/rancher/opni/plugins/alerting/pkg/apis/rules" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -26,8 +24,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { identityClient := controlv1.NewIdentityClient(cc) ruleSyncClient := rules.NewRuleSyncClient(cc) - p.configureLoggers(identityClient) - p.node.SetClients( healthListenerClient, nodeClient, @@ -36,11 +32,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { p.ruleStreamer.SetClients(ruleSyncClient) } - -func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.lg.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/logging/pkg/agent/stream.go b/plugins/logging/pkg/agent/stream.go index c7c73daaba..b18478bb18 100644 --- a/plugins/logging/pkg/agent/stream.go +++ b/plugins/logging/pkg/agent/stream.go @@ -2,13 +2,10 @@ package agent import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" - controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/logging/apis/node" collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -25,17 +22,7 @@ func (p *Plugin) StreamServers() []streamext.Server { } func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { - p.configureLoggers(cc) nodeClient := node.NewNodeLoggingCapabilityClient(cc) p.node.SetClient(nodeClient) p.otelForwarder.SetClient(cc) } - -func (p *Plugin) configureLoggers(cc grpc.ClientConnInterface) { - identityClient := controlv1.NewIdentityClient(cc) - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/metrics/pkg/agent/stream.go b/plugins/metrics/pkg/agent/stream.go index 7598038b15..73393ddc26 100644 --- a/plugins/metrics/pkg/agent/stream.go +++ b/plugins/metrics/pkg/agent/stream.go @@ -4,13 +4,11 @@ import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" "github.com/rancher/opni/pkg/clients" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/metrics/apis/node" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -31,8 +29,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { healthListenerClient := controlv1.NewHealthListenerClient(cc) identityClient := controlv1.NewIdentityClient(cc) - p.configureLoggers(identityClient) - p.httpServer.SetRemoteWriteClient(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient)) p.ruleStreamer.SetRemoteWriteClient(remotewrite.NewRemoteWriteClient(cc)) p.node.SetRemoteWriter(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient)) @@ -44,11 +40,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { ) } - -func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { - id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -} diff --git a/plugins/topology/pkg/topology/agent/stream.go b/plugins/topology/pkg/topology/agent/stream.go index d2731fb843..d6ae865a5f 100644 --- a/plugins/topology/pkg/topology/agent/stream.go +++ b/plugins/topology/pkg/topology/agent/stream.go @@ -8,10 +8,8 @@ import ( // "github.com/rancher/opni/pkg/clients" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" - "github.com/rancher/opni/pkg/logger" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -25,16 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server { func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { p.topologyStreamer.SetIdentityClient(controlv1.NewIdentityClient(cc)) - p.configureLoggers() p.topologyStreamer.SetTopologyStreamClient(stream.NewRemoteTopologyClient(cc)) p.node.SetClient(node.NewNodeTopologyCapabilityClient(cc)) } - -func (p *Plugin) configureLoggers() { - id, err := p.topologyStreamer.identityClient.Whoami(p.ctx, &emptypb.Empty{}) - if err != nil { - p.logger.Error("error fetching node id", logger.Err(err)) - } - logger.InitPluginWriter(id.GetId()) -}