diff --git a/core/consumer/BUILD.bazel b/core/consumer/BUILD.bazel index 392b1006..b52d21d3 100644 --- a/core/consumer/BUILD.bazel +++ b/core/consumer/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/errs", + "//core/metrics", "//entity/queue", "//extension/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/core/consumer/consumer.go b/core/consumer/consumer.go index 1fa27142..6170d63e 100644 --- a/core/consumer/consumer.go +++ b/core/consumer/consumer.go @@ -23,6 +23,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/extension/queue" "go.uber.org/zap" ) @@ -329,8 +330,10 @@ func (m *consumer) processPartition(ctx context.Context, controller Controller, // processDelivery calls the controller and performs ack/nack based on the result. func (m *consumer) processDelivery(ctx context.Context, controller Controller, delivery queue.Delivery, controllerScope tally.Scope) { + const opName = "process" + start := time.Now() - controllerScope.Counter("messages_received").Inc(1) + metrics.NamedCounter(controllerScope, opName, "messages_received", 1) msg := delivery.Message() topicKey := controller.TopicKey() @@ -364,10 +367,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d } } - latencyScope := controllerScope.Tagged(map[string]string{ - "success": successTag, - }) - latencyScope.Timer("controller_latency").Record(elapsed) + metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag)) if err != nil { // Check if the error is non-retryable (poison pill message) @@ -382,7 +382,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d "elapsed_ms", elapsed.Milliseconds(), ) - controllerScope.Counter("non_retryable_errors").Inc(1) + metrics.NamedCounter(controllerScope, opName, "non_retryable_errors", 1) // Reject moves to DLQ (or acks if DLQ disabled) if rejectErr := delivery.Reject(ctx, err.Error()); rejectErr != nil { @@ -392,7 +392,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d "message_id", msg.ID, "error", rejectErr, ) - controllerScope.Counter("reject_errors").Inc(1) + metrics.NamedCounter(controllerScope, opName, "reject_errors", 1) } return } @@ -414,7 +414,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d "elapsed_ms", elapsed.Milliseconds(), ) - controllerScope.Counter("controller_errors").Inc(1) + metrics.NamedCounter(controllerScope, opName, "controller_errors", 1) // Nack with no delay - let visibility timeout handle retry delay nackStart := time.Now() @@ -425,14 +425,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d "message_id", msg.ID, "error", nackErr, ) - controllerScope.Counter("nack_errors").Inc(1) + metrics.NamedCounter(controllerScope, opName, "nack_errors", 1) } else { - controllerScope.Counter("nack_count").Inc(1) - nackScope := controllerScope.Tagged(map[string]string{ - "operation": "nack", - "success": "true", - }) - nackScope.Timer("ack_nack_latency").Record(time.Since(nackStart)) + metrics.NamedCounter(controllerScope, opName, "nack_count", 1) + metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(nackStart), + metrics.NewTag("operation", "nack"), + metrics.NewTag("success", "true"), + ) } return } @@ -446,23 +445,20 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d "message_id", msg.ID, "error", ackErr, ) - controllerScope.Counter("ack_errors").Inc(1) - ackScope := controllerScope.Tagged(map[string]string{ - "operation": "ack", - "success": "false", - }) - ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart)) + metrics.NamedCounter(controllerScope, opName, "ack_errors", 1) + metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart), + metrics.NewTag("operation", "ack"), + metrics.NewTag("success", "false"), + ) return } - controllerScope.Counter("messages_processed").Inc(1) - controllerScope.Counter("ack_count").Inc(1) - - ackScope := controllerScope.Tagged(map[string]string{ - "operation": "ack", - "success": "true", - }) - ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart)) + metrics.NamedCounter(controllerScope, opName, "messages_processed", 1) + metrics.NamedCounter(controllerScope, opName, "ack_count", 1) + metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart), + metrics.NewTag("operation", "ack"), + metrics.NewTag("success", "true"), + ) m.logger.Debugw("message processed successfully", "controller", controller.Name(), diff --git a/extension/mergechecker/github/BUILD.bazel b/extension/mergechecker/github/BUILD.bazel index ff1a2cba..75515552 100644 --- a/extension/mergechecker/github/BUILD.bazel +++ b/extension/mergechecker/github/BUILD.bazel @@ -10,6 +10,7 @@ go_library( importpath = "github.com/uber/submitqueue/extension/mergechecker/github", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//entity", "//entity/github", "//extension/mergechecker", diff --git a/extension/mergechecker/github/checker.go b/extension/mergechecker/github/checker.go index 41fed73c..2fc352f7 100644 --- a/extension/mergechecker/github/checker.go +++ b/extension/mergechecker/github/checker.go @@ -23,6 +23,7 @@ import ( "net/http" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entitygithub "github.com/uber/submitqueue/entity/github" "github.com/uber/submitqueue/extension/mergechecker" @@ -60,17 +61,19 @@ func NewMergeChecker(params Params) mergechecker.MergeChecker { } // Check assesses whether a change can merge cleanly using the GitHub GraphQL API. -func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (mergechecker.Result, error) { - c.metricsScope.Counter("check_started").Inc(1) +func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (result mergechecker.Result, retErr error) { + const opName = "check" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() - result := mergechecker.Result{} // Parse all change IDs // TODO: classify parse errors as user errors (non-retryable) vs system errors. changeIDs := make([]entitygithub.ChangeID, 0, len(change.URIs)) for _, rawID := range change.URIs { cid, err := entitygithub.ParseChangeID(rawID) if err != nil { - c.metricsScope.Counter("parse_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "parse_errors", 1) return result, fmt.Errorf("failed to parse change ID %q: %w", rawID, err) } changeIDs = append(changeIDs, cid) @@ -79,26 +82,26 @@ func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Ch // Fetch PR info from GitHub GraphQL API prInfoMap, err := c.fetchPRInfo(ctx, changeIDs) if err != nil { - c.metricsScope.Counter("graphql_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "graphql_errors", 1) return result, fmt.Errorf("failed to fetch PR info: %w", err) } // Validate PR mergeability mergeable, reason, err := validatePRs(changeIDs, prInfoMap) if err != nil { - c.metricsScope.Counter("validation_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "validation_errors", 1) return result, err } if !mergeable { - c.metricsScope.Counter("not_mergeable").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1) c.logger.Infow("change not mergeable", "queue", queue, "reason", reason, "change_uris", change.URIs, ) } else { - c.metricsScope.Counter("mergeable").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "mergeable", 1) } result.Mergeable = mergeable diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index 627babbd..649c906a 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//core/consumer", "//core/errs", + "//core/metrics", "//entity", "//entity/queue", "//extension/counter", diff --git a/gateway/controller/land.go b/gateway/controller/land.go index a43bfc93..a57db16e 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -18,11 +18,11 @@ import ( "context" "errors" "fmt" - "time" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" @@ -55,7 +55,7 @@ type LandController struct { func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *LandController { return &LandController{ logger: logger, - metricsScope: scope, + metricsScope: scope.SubScope("land_controller"), counter: counter, requestLogStore: requestLogStore, registry: registry, @@ -63,13 +63,11 @@ func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter cou } // Land handles the land request and returns a response -func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) { - start := time.Now() - defer func() { - c.metricsScope.Timer("land_request_latency").Record(time.Since(start)) - }() +func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) { + const opName = "land" - c.metricsScope.Counter("land_request_count").Inc(1) + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() // Validate required fields. if req.Queue == "" { @@ -131,7 +129,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan "sqid", landRequest.ID, "topic_key", consumer.TopicKeyStart, ) - c.metricsScope.Counter("publish_success").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1) return &pb.LandResponse{ Sqid: landRequest.ID, diff --git a/orchestrator/controller/BUILD.bazel b/orchestrator/controller/BUILD.bazel index 93f8daad..7b6334b6 100644 --- a/orchestrator/controller/BUILD.bazel +++ b/orchestrator/controller/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/orchestrator/controller", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//orchestrator/protopb", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index 22692a1a..8f629c53 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", "//extension/conflict", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 2dadde22..23b2e0ef 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/conflict" @@ -71,22 +72,25 @@ func NewController( // Process processes a batch delivery from the queue. // Deserializes the request, groups into batch, and publishes to the score topic. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() // Deserialize request ID from payload rid, err := entity.RequestIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) return fmt.Errorf("failed to deserialize request ID: %w", err) } // Fetch request from storage request, err := c.store.GetRequestStore().Get(ctx, rid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get request %s: %w", rid.ID, err) } @@ -104,7 +108,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Generate a globally unique batch ID. seq, err := c.counter.Next(ctx, "batch/"+request.Queue) if err != nil { - c.metricsScope.Counter("counter_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "counter_errors", 1) return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err) } @@ -125,7 +129,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er entity.BatchStateMerging, }) if err != nil { - c.metricsScope.Counter("batch_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } @@ -134,7 +138,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // apply; the dependency graph only tracks the relation. conflicts, err := c.analyzer.Analyze(ctx, batch, activeBatches) if err != nil { - c.metricsScope.Counter("conflict_analyzer_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1) return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err) } @@ -155,7 +159,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er for _, depID := range conflictingIDs { existing, err := c.store.GetBatchDependentStore().Get(ctx, depID) if err != nil { - c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1) return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", depID, err) } @@ -163,7 +167,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er newVersion := existing.Version + 1 if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, depID, existing.Version, newVersion, dependents); err != nil { - c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1) return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", depID, batch.ID, err) } } @@ -176,7 +180,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil { - c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1) return fmt.Errorf("failed to create batch dependent index for new batchID=%s: %w", batch.ID, err) } @@ -184,7 +188,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist. // We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries. if err := c.store.GetBatchStore().Create(ctx, batch); err != nil { - c.metricsScope.Counter("batch_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to create batch in batch store: %w", err) } @@ -199,7 +203,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID. // The downstream logic should be able to handle stale entries by looking at the state of the batch. if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish batch ID to score topic: %w", err) } @@ -208,8 +212,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "topic_key", consumer.TopicKeyScore, ) - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index 9767bc59..50c0f3e4 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", "//extension/storage", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index 8bef78fa..48d4cf16 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/storage" @@ -63,22 +64,25 @@ func NewController( // Process processes a build delivery from the queue. // Deserializes the batch, triggers a build, and publishes a build entity to the build signal topic. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() // Deserialize batch ID from payload bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } // Fetch batch from storage batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -103,7 +107,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish build to build signal topic if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to buildsignal: %w", err) } @@ -113,8 +117,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "topic_key", consumer.TopicKeyBuildSignal, ) - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index 73c973a8..df5bb9dd 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", "//extension/storage", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index c7b4db87..c7b92e53 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/storage" @@ -63,15 +64,18 @@ func NewController( // Process processes a build signal delivery from the queue. // Deserializes the build and publishes a batch result to the speculate topic. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() // Deserialize build entity build, err := entity.BuildFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize build: %w", err) } @@ -91,13 +95,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Fetch batch from storage to get the partition key (queue) batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) } // Publish batch to speculate topic if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -107,8 +111,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "topic_key", consumer.TopicKeySpeculate, ) - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/log/BUILD.bazel b/orchestrator/controller/log/BUILD.bazel index d891b39c..58258cac 100644 --- a/orchestrator/controller/log/BUILD.bazel +++ b/orchestrator/controller/log/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/log/log.go b/orchestrator/controller/log/log.go index 3edd412e..3660b1bd 100644 --- a/orchestrator/controller/log/log.go +++ b/orchestrator/controller/log/log.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" @@ -59,15 +60,18 @@ func NewController( // Process processes a log delivery from the queue. // Deserializes the request log entry and persists it to storage. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() // Deserialize request log entry logEntry, err := entity.RequestLogFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize request log: %w", err) } @@ -81,12 +85,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Persist request log to storage if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to insert request log: %w", err) } - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/ping.go b/orchestrator/controller/ping.go index b824426b..cc57c286 100644 --- a/orchestrator/controller/ping.go +++ b/orchestrator/controller/ping.go @@ -20,6 +20,7 @@ import ( "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" pb "github.com/uber/submitqueue/orchestrator/protopb" "go.uber.org/zap" ) @@ -39,20 +40,18 @@ func NewPingController(logger *zap.Logger, scope tally.Scope) *PingController { } // Ping handles the ping request and returns a response -func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { - start := time.Now() - defer func() { - c.metricsScope.Timer("ping_latency").Record(time.Since(start)) - }() +func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (resp *pb.PingResponse, retErr error) { + const opName = "ping" - c.metricsScope.Counter("ping_requests_total").Inc(1) + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() message := "pong!" isEcho := false if req.Message != "" { message = "echo: " + req.Message isEcho = true - c.metricsScope.Counter("echo_requests_total").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "echo_requests", 1) } hostname, _ := os.Hostname() diff --git a/orchestrator/controller/score/BUILD.bazel b/orchestrator/controller/score/BUILD.bazel index 26938183..661f8c14 100644 --- a/orchestrator/controller/score/BUILD.bazel +++ b/orchestrator/controller/score/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//core/request", "//entity", "//entity/queue", diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go index 87d5d222..a6f06b85 100644 --- a/orchestrator/controller/score/score.go +++ b/orchestrator/controller/score/score.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" corerequest "github.com/uber/submitqueue/core/request" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" @@ -71,22 +72,25 @@ func NewController( // persists the minimum score, publishes request log entries, // and publishes to the speculate topic. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() // Deserialize batch ID from payload bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } // Fetch batch from storage batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -102,14 +106,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Score each request's change and take the minimum (worst-case) as the batch score batchScore, err := c.scoreBatch(ctx, batch) if err != nil { - c.metricsScope.Counter("scorer_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "scorer_errors", 1) return fmt.Errorf("failed to score batch %s: %w", batch.ID, err) } // Atomically update score and state to "scored" in the database newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, newVersion, batchScore, entity.BatchStateScored); err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update score for batch %s: %w", batch.ID, err) } batch.Version = newVersion @@ -124,13 +128,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "batch_id": batch.ID, "score": fmt.Sprintf("%.4f", batchScore), }); err != nil { - c.metricsScope.Counter("request_log_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "request_log_errors", 1) return fmt.Errorf("failed to publish request logs for batch %s: %w", batch.ID, err) } // Publish to speculate topic if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -139,8 +143,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "topic_key", consumer.TopicKeySpeculate, ) - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/speculate/BUILD.bazel b/orchestrator/controller/speculate/BUILD.bazel index 955780c4..6aa8e7e7 100644 --- a/orchestrator/controller/speculate/BUILD.bazel +++ b/orchestrator/controller/speculate/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", "//extension/storage", diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 2e8145e8..d5250be1 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -20,6 +20,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/storage" @@ -56,6 +57,9 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) +// opName is the metric operation name shared by every emit in this file. +const opName = "process" + // NewController creates a new speculate controller for the orchestrator. func NewController( logger *zap.SugaredLogger, @@ -77,20 +81,21 @@ func NewController( // Process advances a batch one step along the naive happy-path. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -99,13 +104,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // otherwise never reconcile. Re-publishing is safe because conclude is // idempotent on the batch ID. if batch.State.IsTerminal() { - c.metricsScope.Counter("self_heal_terminal").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "self_heal_terminal", 1) return c.fanout(ctx, batch.ID, batch.Queue) } // Merging is owned by the merge controller, which has its own self-heal. if batch.State == entity.BatchStateMerging { - c.metricsScope.Counter("noop_merging").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "noop_merging", 1) return nil } @@ -115,7 +120,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er case entity.BatchStateSpeculating: return c.tryFinalize(ctx, batch) default: - c.metricsScope.Counter("unexpected_state").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "unexpected_state", 1) return fmt.Errorf("unexpected batch state %q for batch %s", batch.State, batch.ID) } } @@ -129,7 +134,7 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e ) if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to build: %w", err) } @@ -137,11 +142,11 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e // the next event will see the new state and behave correctly. newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateSpeculating); err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to speculating: %w", batch.ID, err) } - c.metricsScope.Counter("started_speculation").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "started_speculation", 1) return nil } @@ -174,7 +179,7 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error } if len(pending) > 0 { - c.metricsScope.Counter("waiting_on_deps").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "waiting_on_deps", 1) c.logger.Debugw("dependencies still in flight; waiting", "batch_id", batch.ID, "pending_dependency_ids", pending, @@ -183,17 +188,16 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error } if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to merge: %w", err) } newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateMerging); err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to merging: %w", batch.ID, err) } - c.metricsScope.Counter("processed").Inc(1) return nil } @@ -203,7 +207,7 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error // Without this transition the batch would sit in Speculating forever — no // downstream event ever fires for it again. func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, dep entity.Batch) error { - c.metricsScope.Counter("dependency_failed").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "dependency_failed", 1) c.logger.Warnw("dependency in non-succeeding terminal state; failing batch", "batch_id", batch.ID, "dependency_id", dep.ID, @@ -212,12 +216,12 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) } if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } @@ -233,7 +237,7 @@ func (c *Controller) fetchDependencies(ctx context.Context, batch entity.Batch) for _, depID := range batch.Dependencies { d, err := c.store.GetBatchStore().Get(ctx, depID) if err != nil { - c.metricsScope.Counter("dependency_fetch_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "dependency_fetch_errors", 1) return nil, fmt.Errorf("failed to get dependency batch %s of %s: %w", depID, batch.ID, err) } deps = append(deps, d) @@ -246,7 +250,7 @@ func (c *Controller) fetchDependencies(ctx context.Context, batch entity.Batch) // re-sending to conclude guarantees request-state reconciliation. func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to conclude: %w", err) } return nil diff --git a/orchestrator/controller/start/BUILD.bazel b/orchestrator/controller/start/BUILD.bazel index 85ce2a89..cb11af3e 100644 --- a/orchestrator/controller/start/BUILD.bazel +++ b/orchestrator/controller/start/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//core/request", "//entity", "//entity/queue", diff --git a/orchestrator/controller/start/start.go b/orchestrator/controller/start/start.go index f3d449a8..3ffbe1a3 100644 --- a/orchestrator/controller/start/start.go +++ b/orchestrator/controller/start/start.go @@ -22,6 +22,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/metrics" corerequest "github.com/uber/submitqueue/core/request" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" @@ -73,14 +74,17 @@ func NewController( // Process processes a request delivery from the queue. // Persists the request, claims its URIs in the change store, and publishes to validate. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() msg := delivery.Message() landRequest, err := entity.LandRequestFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize land request: %w", err) } @@ -109,7 +113,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Persist request to storage. ErrAlreadyExists means a queue redelivery of the same // request_id (an at-least-once retry of THIS message), not a cross-request collision. if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { - c.metricsScope.Counter("storage_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to create request: %w", err) } @@ -118,19 +122,19 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // collide on insert; the validate controller surfaces it via GetByURI + a // liveness check against the request store. if err := c.claimURIs(ctx, request); err != nil { - c.metricsScope.Counter("change_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "change_store_errors", 1) return fmt.Errorf("failed to claim URIs for request %s: %w", request.ID, err) } // Record the "new" status in the request log. logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil) if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil { - c.metricsScope.Counter("request_log_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "request_log_errors", 1) return fmt.Errorf("failed to publish request log: %w", err) } if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) return fmt.Errorf("failed to publish to validate: %w", err) } @@ -139,7 +143,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "topic_key", consumer.TopicKeyValidate, ) - c.metricsScope.Counter("processed").Inc(1) return nil } diff --git a/stovepipe/controller/BUILD.bazel b/stovepipe/controller/BUILD.bazel index b86190f4..685b0c20 100644 --- a/stovepipe/controller/BUILD.bazel +++ b/stovepipe/controller/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/stovepipe/controller", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//stovepipe/protopb", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", diff --git a/stovepipe/controller/ping.go b/stovepipe/controller/ping.go index c85bedae..41d216d4 100644 --- a/stovepipe/controller/ping.go +++ b/stovepipe/controller/ping.go @@ -20,6 +20,7 @@ import ( "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" pb "github.com/uber/submitqueue/stovepipe/protopb" "go.uber.org/zap" ) @@ -39,20 +40,18 @@ func NewPingController(logger *zap.Logger, scope tally.Scope) *PingController { } // Ping handles the ping request and returns a response -func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { - start := time.Now() - defer func() { - c.metricsScope.Timer("ping_latency").Record(time.Since(start)) - }() +func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (resp *pb.PingResponse, retErr error) { + const opName = "ping" - c.metricsScope.Counter("ping_requests_total").Inc(1) + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() message := "pong!" isEcho := false if req.Message != "" { message = "echo: " + req.Message isEcho = true - c.metricsScope.Counter("echo_requests_total").Inc(1) + metrics.NamedCounter(c.metricsScope, opName, "echo_requests", 1) } hostname, _ := os.Hostname()