From eebb5c75021827476d3be74fef44e42298e67931 Mon Sep 17 00:00:00 2001 From: Janelle Law Date: Wed, 18 Oct 2023 16:25:26 +0000 Subject: [PATCH] attempt to fix data race --- pkg/logger/logger.go | 13 ++++++++++--- pkg/logger/plugin_writer.go | 11 ++++++++--- plugins/alerting/pkg/agent/stream.go | 2 +- plugins/logging/pkg/agent/stream.go | 2 +- plugins/metrics/pkg/agent/stream.go | 2 +- plugins/topology/pkg/topology/agent/stream.go | 2 +- 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index c24dd55ade..4b5c16ed42 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -31,7 +31,6 @@ var ( pluginGroupPrefix = "plugin" NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling logFs afero.Fs - logFileName = "opni-logs" DefaultTimeFormat = "2006 Jan 02 15:04:05" errKey = "err" ) @@ -236,7 +235,7 @@ func (s *sampler) onDroppedHook(_ context.Context, r slog.Record) { } func ReadOnlyFile(clusterID string) afero.File { - f, err := logFs.OpenFile(logFileName+clusterID, os.O_RDONLY|os.O_CREATE, 0666) + f, err := logFs.OpenFile(clusterID, os.O_RDONLY|os.O_CREATE, 0666) if err != nil { panic(err) } @@ -244,9 +243,17 @@ func ReadOnlyFile(clusterID string) afero.File { } func WriteOnlyFile(clusterID string) afero.File { - f, err := logFs.OpenFile(logFileName+clusterID, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + f, err := logFs.OpenFile(clusterID, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) if err != nil { panic(err) } return f } + +func GetFileIfExists(clusterID string) afero.File { + f, err := logFs.Open(clusterID) + if err != nil { + return nil + } + return f +} diff --git a/pkg/logger/plugin_writer.go b/pkg/logger/plugin_writer.go index 2373163300..597df3bf85 100644 --- a/pkg/logger/plugin_writer.go +++ b/pkg/logger/plugin_writer.go @@ -6,7 +6,6 @@ import ( ) var sharedPluginWriter = &pluginWriter{ - w: &DefaultWriter, mu: &sync.Mutex{}, } @@ -15,10 +14,16 @@ type pluginWriter struct { mu *sync.Mutex } -func SetPluginWriter(agentId string) { +func InitPluginWriter(agentId string) { sharedPluginWriter.mu.Lock() defer sharedPluginWriter.mu.Unlock() - f := WriteOnlyFile(agentId) + if sharedPluginWriter.w != nil { + return + } + f := GetFileIfExists(agentId) + if f == nil { + f = WriteOnlyFile(agentId) + } fileWriter := f.(io.Writer) sharedPluginWriter.w = &fileWriter } diff --git a/plugins/alerting/pkg/agent/stream.go b/plugins/alerting/pkg/agent/stream.go index 37592f47be..e9f91017d1 100644 --- a/plugins/alerting/pkg/agent/stream.go +++ b/plugins/alerting/pkg/agent/stream.go @@ -42,5 +42,5 @@ func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { if err != nil { p.lg.Error("error fetching node id", logger.Err(err)) } - logger.SetPluginWriter(id.GetId()) + logger.InitPluginWriter(id.GetId()) } diff --git a/plugins/logging/pkg/agent/stream.go b/plugins/logging/pkg/agent/stream.go index d98544c3a1..c7c73daaba 100644 --- a/plugins/logging/pkg/agent/stream.go +++ b/plugins/logging/pkg/agent/stream.go @@ -37,5 +37,5 @@ func (p *Plugin) configureLoggers(cc grpc.ClientConnInterface) { if err != nil { p.logger.Error("error fetching node id", logger.Err(err)) } - logger.SetPluginWriter(id.GetId()) + logger.InitPluginWriter(id.GetId()) } diff --git a/plugins/metrics/pkg/agent/stream.go b/plugins/metrics/pkg/agent/stream.go index 6eb236f32f..7598038b15 100644 --- a/plugins/metrics/pkg/agent/stream.go +++ b/plugins/metrics/pkg/agent/stream.go @@ -50,5 +50,5 @@ func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) { if err != nil { p.logger.Error("error fetching node id", logger.Err(err)) } - logger.SetPluginWriter(id.GetId()) + logger.InitPluginWriter(id.GetId()) } diff --git a/plugins/topology/pkg/topology/agent/stream.go b/plugins/topology/pkg/topology/agent/stream.go index 655770273c..d2731fb843 100644 --- a/plugins/topology/pkg/topology/agent/stream.go +++ b/plugins/topology/pkg/topology/agent/stream.go @@ -36,5 +36,5 @@ func (p *Plugin) configureLoggers() { if err != nil { p.logger.Error("error fetching node id", logger.Err(err)) } - logger.SetPluginWriter(id.GetId()) + logger.InitPluginWriter(id.GetId()) }