Skip to content

Commit

Permalink
remove With to avoid allocs
Browse files Browse the repository at this point in the history
  • Loading branch information
Janelle Law committed Aug 14, 2023
1 parent 7b3a829 commit 6ad9c63
Show file tree
Hide file tree
Showing 51 changed files with 121 additions and 291 deletions.
8 changes: 2 additions & 6 deletions pkg/agent/v1/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ func (a *Agent) streamRuleGroupUpdates(ctx context.Context) (<-chan [][]byte, er
searchInterval = duration
}
notifier := notifier.NewPeriodicUpdateNotifier(ctx, finder, searchInterval)
a.logger.With(
"interval", searchInterval.String(),
).Debug("rule discovery notifier configured")
a.logger.Debug("rule discovery notifier configured", "interval", searchInterval.String())

notifierC := notifier.NotifyC(ctx)
a.logger.Debug("starting rule group update notifier")
Expand All @@ -82,9 +80,7 @@ func (a *Agent) marshalRuleGroups(ruleGroups []rules.RuleGroup) [][]byte {
for _, ruleGroup := range ruleGroups {
doc, err := yaml.Marshal(ruleGroup)
if err != nil {
a.logger.With(
"group", ruleGroup.Name,
).Error("failed to marshal rule group", "error", err)
a.logger.Error("failed to marshal rule group", "error", err, "group", ruleGroup.Name,)
continue
}
yamlDocs = append(yamlDocs, doc)
Expand Down
4 changes: 1 addition & 3 deletions pkg/agent/v2/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,7 @@ func (a *Agent) ListenAndServe(ctx context.Context) error {
if err != nil {
return err
}
a.logger.With(
"address", listener.Addr(),
).Info("agent http server starting")
a.logger.Info("agent http server starting", "address", listener.Addr())

ctx, ca := context.WithCancel(ctx)

Expand Down
12 changes: 3 additions & 9 deletions pkg/clients/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,12 @@ func (gc *gatewayClient) Connect(ctx context.Context) (_ grpc.ClientConnInterfac
var headerMd metadata.MD
splicedStream, err := streamClient.Connect(ctx, grpc.Header(&headerMd))
if err != nil {
gc.logger.With(
"name", sc.name,
).Warn("failed to connect to spliced stream, skipping", "error", err)
gc.logger.Warn("failed to connect to spliced stream, skipping", "name", sc.name, "error", err)
continue
}

if err := ts.Splice(splicedStream, totem.WithStreamName(sc.name)); err != nil {
gc.logger.With(
"name", sc.name,
).Warn("failed to splice remote stream, skipping", "error", err)
gc.logger.Warn("failed to splice remote stream, skipping", "error", err, "name", sc.name)
continue
}

Expand All @@ -213,9 +209,7 @@ func (gc *gatewayClient) Connect(ctx context.Context) (_ grpc.ClientConnInterfac
Type: streamv1.EventType_DiscoveryComplete,
CorrelationId: correlationId,
}); err != nil {
gc.logger.With(
"name", sc.name,
).Error("failed to notify remote stream", "error", err)
gc.logger.Error("failed to notify remote stream", "name", sc.name, "error", err)
}
}()
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func (d *DelegateServer) HandleAgentConnection(ctx context.Context, clientSet ag

cluster, err := d.clusterStore.GetCluster(ctx, &corev1.Reference{Id: id})
if err != nil {
d.logger.With(
"id", id,
).Error("internal error: failed to look up connecting agent", "error", err)
d.logger.Error("internal error: failed to look up connecting agent", "error", err, "id", id)
return
}

Expand All @@ -67,14 +65,14 @@ func (d *DelegateServer) HandleAgentConnection(ctx context.Context, clientSet ag
labels: cluster.GetLabels(),
id: id,
}
d.logger.With("id", id).Debug("agent connected")
d.logger.Debug("agent connected", "id", id)
d.mu.Unlock()

<-ctx.Done()

d.mu.Lock()
delete(d.activeAgents, id)
d.logger.With("id", id).Debug("agent disconnected")
d.logger.Debug("agent disconnected", "id", id)
d.mu.Unlock()
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/gateway/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ func (s *GatewayGRPCServer) ListenAndServe(ctx context.Context) error {
}
s.servicesMu.Unlock()

s.logger.With(
"address", listener.Addr().String(),
).Info("gateway gRPC server starting")
s.logger.Info("gateway gRPC server starting", "address", listener.Addr().String())

errC := lo.Async(func() error {
return server.Serve(listener)
Expand Down
10 changes: 2 additions & 8 deletions pkg/gateway/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,11 @@ ROUTES:
for _, route := range cfg.Routes {
for _, reservedPrefix := range s.reservedPrefixRoutes {
if strings.HasPrefix(route.Path, reservedPrefix) {
s.logger.With(
"route", route.Method+" "+route.Path,
"plugin", pluginMeta.Module,
).Warn("skipping route for plugin as it conflicts with a reserved prefix")
s.logger.Warn("skipping route for plugin as it conflicts with a reserved prefix", "route", route.Method+" "+route.Path, "plugin", pluginMeta.Module)
continue ROUTES
}
}
s.logger.With(
"route", route.Method+" "+route.Path,
"plugin", pluginMeta.Module,
).Debug("configured route for plugin")
s.logger.Debug("configured route for plugin", "route", route.Method+" "+route.Path, "plugin", pluginMeta.Module)
s.router.Handle(route.Method, route.Path, forwarder)
}
}
20 changes: 5 additions & 15 deletions pkg/gateway/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ func (s *StreamServer) Connect(stream streamv1.Stream_ConnectServer) error {
Id: id,
})
if err != nil {
s.logger.With(
"id", id,
).Error("failed to get cluster", "error", err)
s.logger.Error("failed to get cluster", "error", err, "id", id)
return err
}
eventC, err := s.clusterStore.WatchCluster(ctx, c)
Expand All @@ -137,15 +135,11 @@ func (s *StreamServer) Connect(stream streamv1.Stream_ConnectServer) error {
streamClient := streamv1.NewStreamClient(r.cc)
splicedStream, err := streamClient.Connect(ctx)
if err != nil {
s.logger.With(
"clusterId", c.Id,
).Warn("failed to connect to remote stream, skipping", "error", err)
s.logger.Warn("failed to connect to remote stream, skipping", "error", err, "clusterId", c.Id)
continue
}
if err := ts.Splice(splicedStream, totem.WithStreamName(r.name)); err != nil {
s.logger.With(
"clusterId", c.Id,
).Warn("failed to splice remote stream, skipping", "error", err)
s.logger.Warn("failed to splice remote stream, skipping", "error", err, "clusterId", c.Id)
continue
}
}
Expand Down Expand Up @@ -178,19 +172,15 @@ func (s *StreamServer) Connect(stream streamv1.Stream_ConnectServer) error {
}

func (s *StreamServer) RegisterService(desc *grpc.ServiceDesc, impl any) {
s.logger.With(
"service", desc.ServiceName,
).Debug("registering service")
s.logger.Debug("registering service", "service", desc.ServiceName)
if len(desc.Streams) > 0 {
panic("failed to register service: nested streams are currently not supported")
}
s.services = append(s.services, util.PackService(desc, impl))
}

func (s *StreamServer) registerInternalService(desc *grpc.ServiceDesc, impl any) {
s.logger.With(
"service", desc.ServiceName,
).Debug("registering internal service")
s.logger.Debug("registering internal service", "service", desc.ServiceName)
if len(desc.Streams) > 0 {
panic("failed to register internal service: nested streams are currently not supported")
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/gateway/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (f *SyncRequester) HandleAgentConnection(ctx context.Context, clientSet age
f.mu.Lock()
id := cluster.StreamAuthorizedID(ctx)
f.activeAgents[id] = clientSet
f.logger.With("id", id).Debug("agent connected")
f.logger.Debug("agent connected", "id", id)
f.mu.Unlock()

// blocks until ctx is canceled
Expand All @@ -59,7 +59,7 @@ func (f *SyncRequester) HandleAgentConnection(ctx context.Context, clientSet age

f.mu.Lock()
delete(f.activeAgents, id)
f.logger.With("id", id).Debug("agent disconnected")
f.logger.Debug("agent disconnected", "id", id)
f.mu.Unlock()
}

Expand All @@ -85,10 +85,7 @@ func (f *SyncRequester) RequestSync(ctx context.Context, req *capabilityv1.SyncR
}

for _, clientSet := range toSync {
f.logger.With(
"agentId", req.GetCluster().GetId(),
"capabilities", req.GetFilter().GetCapabilityNames(),
).Debug("sending sync request to agent")
f.logger.Debug("sending sync request to agent", "agentId", req.GetCluster().GetId(), "capabilities", req.GetFilter().GetCapabilityNames())
_, err := clientSet.SyncNow(ctx, req.GetFilter())
code := status.Code(err)
mSyncRequests.WithLabelValues(req.GetCluster().GetId(), fmt.Sprint(code), code.String()).Inc()
Expand Down
4 changes: 1 addition & 3 deletions pkg/management/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ func (m *Server) configureApiExtensionDirector(ctx context.Context, pl plugins.L
reflectClient := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(cc))
sds, err := p.Descriptors(ctx, &emptypb.Empty{})
if err != nil {
m.logger.With(
"plugin", md.Module,
).Error("failed to get extension descriptors", "error", err)
m.logger.Error("failed to get extension descriptors", "error", err, "plugin", md.Module)
return
}
for _, sd := range sds.Items {
Expand Down
5 changes: 1 addition & 4 deletions pkg/management/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,7 @@ func (m *Server) ListCapabilities(ctx context.Context, in *emptypb.Empty) (*mana
}
details, err := capability.Info(ctx, in)
if err != nil {
m.logger.With(
"capability", name,
"error", err,
).Error("failed to fetch capability details")
m.logger.Error("failed to fetch capability details", "capability", name, "error", err)
continue
}
items = append(items, &managementv1.CapabilityInfo{
Expand Down
3 changes: 2 additions & 1 deletion pkg/opni/commands/agent_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func BuildAgentV2Cmd() *cobra.Command {
path, err := config.FindConfig()
if err != nil {
if errors.Is(err, config.ErrConfigNotFound) {
agentlg.Error(`could not find a config file in current directory or ["/etc/opni"], and --config was not given`)
wd, _ := os.Getwd()
agentlg.Error(`could not find a config file in working directory or ["/etc/opni"], and --config was not given`, "workingDir", wd)
}
agentlg.Error("an error occurred while searching for a config file")
os.Exit(1)
Expand Down
18 changes: 9 additions & 9 deletions pkg/opni/commands/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@ func BuildSupportBootstrapCmd() *cobra.Command {
wd, _ := os.Getwd()
agentlg.Infof(`could not find a config file in ["%s", "$home/.opni], and --config was not given`, wd)
default:
agentlg.With(zap.Error(err)).Fatal(cmd.Context(), "an error occurred while searching for a config file")
agentlg.With(zap.Error(err)).Fatal("an error occurred while searching for a config file")
}
}

agentConfig := &v1beta1.SupportAgentConfig{}
if configFile != "" {
objects, err := config.LoadObjectsFromFile(configFile)
if err != nil {
agentlg.With(zap.Error(err)).Fatal(cmd.Context(), "failed to load config")
agentlg.With(zap.Error(err)).Fatal("failed to load config")
}
if ok := objects.Visit(func(config *v1beta1.SupportAgentConfig) {
agentConfig = config
}); !ok {
agentlg.Fatal(cmd.Context(), "no support agent config found in config file")
agentlg.Fatal("no support agent config found in config file")
}
} else {
agentConfig.TypeMeta = v1beta1.SupportAgentConfigTypeMeta
Expand All @@ -111,7 +111,7 @@ func BuildSupportBootstrapCmd() *cobra.Command {
case agentConfig.Spec.AuthData.Token != "":
token = agentConfig.Spec.AuthData.Token
default:
agentlg.Fatal(cmd.Context(), "no token provided")
agentlg.Fatal("no token provided")
}

bootstrapper, err := configureSupportAgentBootstrap(
Expand All @@ -123,22 +123,22 @@ func BuildSupportBootstrapCmd() *cobra.Command {
if err != nil {
agentlg.With(
zap.Error(err),
).Fatal(cmd.Context(), "failed to configure bootstrap")
).Fatal("failed to configure bootstrap")
}

ipBuilder, err := ident.GetProviderBuilder("supportagent")
if err != nil {
agentlg.With(
zap.Error(err),
).Fatal(cmd.Context(), "failed to get ident provider")
).Fatal("failed to get ident provider")
}
ip := ipBuilder(agentConfig)

userid, err := ip.UniqueIdentifier(ctx)
if err != nil {
agentlg.With(
zap.Error(err),
).Fatal(cmd.Context(), "failed to get unique identifier")
).Fatal("failed to get unique identifier")
}

kr, err := bootstrapper.Bootstrap(ctx, ip)
Expand Down Expand Up @@ -187,7 +187,7 @@ func BuildSupportPingCmd() *cobra.Command {
ctx, ca := context.WithCancel(waitctx.FromContext(cmd.Context()))
defer ca()

agentlg := logger.NewZap()
agentlg := logger.NewZap(logger.WithLogLevel(logger.ParseLevel(logLevel)))

config := supportagentconfig.MustLoadConfig(configFile, agentlg)

Expand Down Expand Up @@ -302,7 +302,7 @@ func BuildSupportPasswordCmd() *cobra.Command {
Use: "password",
Short: "Shows the initial password for Opensearch Dashboards",
Run: func(cmd *cobra.Command, args []string) {
agentlg := logger.NewZap()
agentlg := logger.NewZap(logger.WithLogLevel(logger.ParseLevel(logLevel)))

kr, err := supportagentconfig.LoadKeyring(getRetrievePassword)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions pkg/plugins/apis/apiextensions/stream/plugin_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (e *agentStreamExtensionServerImpl) Connect(stream streamv1.Stream_ConnectS
// and reconnect.
return status.Errorf(codes.DeadlineExceeded, "stream client discovery timed out after %s", timeout)
case <-stream.Context().Done():
e.logger.With("error", stream.Context().Err()).Error("stream disconnected while waiting for discovery")
e.logger.Error("stream disconnected while waiting for discovery", "error", stream.Context().Err())
return stream.Context().Err()
}

Expand All @@ -143,7 +143,7 @@ func (e *agentStreamExtensionServerImpl) Connect(stream streamv1.Stream_ConnectS
}
case err := <-errC:
if err != nil {
e.logger.With("error", stream.Context().Err()).Error("stream encountered an error while waiting for discovery")
e.logger.Error("stream encountered an error while waiting for discovery", "error", stream.Context().Err())
return status.Errorf(codes.Internal, "stream encountered an error while waiting for discovery: %v", err)
}
}
Expand All @@ -160,9 +160,7 @@ func (e *agentStreamExtensionServerImpl) Connect(stream streamv1.Stream_ConnectS
}

func (e *agentStreamExtensionServerImpl) Notify(_ context.Context, event *streamv1.StreamEvent) (*emptypb.Empty, error) {
e.logger.With(
"type", event.Type.String(),
).Debug("received notify event for:", "name", e.name)
e.logger.Debug("received notify event for:", "name", e.name, "type", event.Type.String())
e.activeStreamsMu.Lock()
defer e.activeStreamsMu.Unlock()

Expand Down
4 changes: 1 addition & 3 deletions pkg/plugins/apis/apiextensions/stream/plugin_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ type gatewayStreamExtensionServerImpl struct {
func (e *gatewayStreamExtensionServerImpl) Connect(stream streamv1.Stream_ConnectServer) error {
id := cluster.StreamAuthorizedID(stream.Context())

e.logger.With(
"id", id,
).Debug("stream connected")
e.logger.Debug("stream connected", "id", id)

opts := []totem.ServerOption{totem.WithName("plugin_" + e.name)}

Expand Down
20 changes: 4 additions & 16 deletions pkg/plugins/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,14 @@ PLUGINS:
f, err := dc.Fs.Open(path)
if err != nil {
if dc.Logger != nil {
dc.Logger.With(
"plugin", path,
"error", err,
).Error("failed to open plugin for reading")
dc.Logger.Error("failed to open plugin for reading", "plugin", path, "error", err)
}
continue
}
md, err := meta.ReadFile(f)
if err != nil {
if dc.Logger != nil {
dc.Logger.With(
"plugin", path,
"error", err,
).Error("failed to read plugin metadata")
dc.Logger.Error("failed to read plugin metadata", "plugin", path, "error", err)
}
f.Close()
continue
Expand All @@ -66,10 +60,7 @@ PLUGINS:
modes, err := meta.QueryPluginModes(md.BinaryPath)
if err != nil {
if dc.Logger != nil {
dc.Logger.With(
"plugin", path,
"error", err,
).Error("failed to query plugin modes")
dc.Logger.Error("failed to query plugin modes", "plugin", path, "error", err)
}
continue
}
Expand All @@ -82,10 +73,7 @@ PLUGINS:
for i, filter := range dc.Filters {
if !filter(md) {
if dc.Logger != nil {
dc.Logger.With(
"plugin", path,
"filter", i,
).Debug("plugin ignored due to filter")
dc.Logger.Debug("plugin ignored due to filter", "plugin", path, "filter", i)
}
continue PLUGINS
}
Expand Down
Loading

0 comments on commit 6ad9c63

Please sign in to comment.