Skip to content

Commit

Permalink
clean up plugin logger initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Janelle Law committed Oct 18, 2023
1 parent eebb5c7 commit 82e1abf
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 114 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/v2/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func New(ctx context.Context, conf *v1beta1.AgentConfig, opts ...AgentOption) (*
if conf.Spec.LogLevel != "" {
level = logger.ParseLevel(conf.Spec.LogLevel)
}
lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(logger.WriteOnlyFile(id))).WithGroup("agent")
fileWriter := logger.InitPluginWriter(id)
lg := logger.New(logger.WithLogLevel(level), logger.WithFileWriter(fileWriter)).WithGroup("agent")
lg.Debug("using log level:", "level", level.String())

var pl *plugins.PluginLoader
Expand Down
39 changes: 39 additions & 0 deletions pkg/logger/file_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package logger

import (
"io"
"sync"
)

type fileWriter struct {
w io.Writer
mu *sync.Mutex
}

func InitPluginWriter(agentId string) io.Writer {
sharedPluginWriter.mu.Lock()
defer sharedPluginWriter.mu.Unlock()
if sharedPluginWriter.w != nil {
return sharedPluginWriter.w
}

f := WriteOnlyFile(agentId)
writer := f.(io.Writer)
sharedPluginWriter.w = writer
return writer
}

func (fw fileWriter) Write(b []byte) (int, error) {
fw.mu.Lock()
defer fw.mu.Unlock()
if fw.w == nil {
return 0, nil
}

n, err := fw.w.Write(b)
if err != nil {
return n, err
}

return n, nil
}
38 changes: 18 additions & 20 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ import (
"io"
"log/slog"
"os"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/slogr"
"github.com/kralicky/gpkg/sync"
gpkgsync "github.com/kralicky/gpkg/sync"
slogmulti "github.com/samber/slog-multi"
slogsampling "github.com/samber/slog-sampling"
"github.com/spf13/afero"
)

const (
pluginGroupPrefix = "plugin"
NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling
errKey = "err"
)

var (
asciiLogo = ` _
____ ____ ____ (_)
Expand All @@ -25,18 +32,17 @@ var (
/_/
Observability + AIOps for Kubernetes
`
DefaultLogLevel = slog.LevelDebug
DefaultWriter io.Writer
DefaultAddSource = true
pluginGroupPrefix = "plugin"
NoRepeatInterval = 3600 * time.Hour // arbitrarily long time to denote one-time sampling
logFs afero.Fs
DefaultTimeFormat = "2006 Jan 02 15:04:05"
errKey = "err"
DefaultLogLevel = slog.LevelDebug
DefaultWriter io.Writer
DefaultAddSource = true
logFs afero.Fs
DefaultTimeFormat = "2006 Jan 02 15:04:05"
logSampler = &sampler{}
sharedPluginWriter = &fileWriter{
mu: &sync.Mutex{},
}
)

var logSampler = &sampler{}

func init() {
logFs = afero.NewMemMapFs()
}
Expand Down Expand Up @@ -225,7 +231,7 @@ func NewPluginLogger(opts ...LoggerOption) *slog.Logger {
}

type sampler struct {
dropped sync.Map[string, uint64]
dropped gpkgsync.Map[string, uint64]
}

func (s *sampler) onDroppedHook(_ context.Context, r slog.Record) {
Expand All @@ -249,11 +255,3 @@ func WriteOnlyFile(clusterID string) afero.File {
}
return f
}

func GetFileIfExists(clusterID string) afero.File {
f, err := logFs.Open(clusterID)
if err != nil {
return nil
}
return f
}
44 changes: 0 additions & 44 deletions pkg/logger/plugin_writer.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/opni/commands/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func parseTimeOrDie(timeStr string) time.Time {
}
t, err := when.EN.Parse(timeStr, time.Now())
if err != nil || t == nil {
lg.Error("could not interpret start time")
lg.Error("could not interpret start/end time")
os.Exit(1)
}
return t.Time
Expand Down
12 changes: 0 additions & 12 deletions plugins/alerting/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package agent
import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/alerting/pkg/apis/node"
"github.com/rancher/opni/plugins/alerting/pkg/apis/rules"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -26,8 +24,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
identityClient := controlv1.NewIdentityClient(cc)
ruleSyncClient := rules.NewRuleSyncClient(cc)

p.configureLoggers(identityClient)

p.node.SetClients(
healthListenerClient,
nodeClient,
Expand All @@ -36,11 +32,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {

p.ruleStreamer.SetClients(ruleSyncClient)
}

func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) {
id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.lg.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}
13 changes: 0 additions & 13 deletions plugins/logging/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package agent

import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/logging/apis/node"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -25,17 +22,7 @@ func (p *Plugin) StreamServers() []streamext.Server {
}

func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
p.configureLoggers(cc)
nodeClient := node.NewNodeLoggingCapabilityClient(cc)
p.node.SetClient(nodeClient)
p.otelForwarder.SetClient(cc)
}

func (p *Plugin) configureLoggers(cc grpc.ClientConnInterface) {
identityClient := controlv1.NewIdentityClient(cc)
id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.logger.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}
12 changes: 0 additions & 12 deletions plugins/metrics/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
"github.com/rancher/opni/pkg/clients"
"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/metrics/apis/node"
"github.com/rancher/opni/plugins/metrics/apis/remoteread"
"github.com/rancher/opni/plugins/metrics/apis/remotewrite"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -31,8 +29,6 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
healthListenerClient := controlv1.NewHealthListenerClient(cc)
identityClient := controlv1.NewIdentityClient(cc)

p.configureLoggers(identityClient)

p.httpServer.SetRemoteWriteClient(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient))
p.ruleStreamer.SetRemoteWriteClient(remotewrite.NewRemoteWriteClient(cc))
p.node.SetRemoteWriter(clients.NewLocker(cc, remotewrite.NewRemoteWriteClient))
Expand All @@ -44,11 +40,3 @@ func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
)

}

func (p *Plugin) configureLoggers(identityClient controlv1.IdentityClient) {
id, err := identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.logger.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}
11 changes: 0 additions & 11 deletions plugins/topology/pkg/topology/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
// "github.com/rancher/opni/pkg/clients"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"

"github.com/rancher/opni/pkg/logger"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -25,16 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server {

func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
p.topologyStreamer.SetIdentityClient(controlv1.NewIdentityClient(cc))
p.configureLoggers()

p.topologyStreamer.SetTopologyStreamClient(stream.NewRemoteTopologyClient(cc))
p.node.SetClient(node.NewNodeTopologyCapabilityClient(cc))
}

func (p *Plugin) configureLoggers() {
id, err := p.topologyStreamer.identityClient.Whoami(p.ctx, &emptypb.Empty{})
if err != nil {
p.logger.Error("error fetching node id", logger.Err(err))
}
logger.InitPluginWriter(id.GetId())
}

0 comments on commit 82e1abf

Please sign in to comment.