From ed471cbf510c1f0c8363417bf98dd67a2c75a0ea Mon Sep 17 00:00:00 2001 From: sergeyb Date: Tue, 10 Mar 2026 02:33:12 +0000 Subject: [PATCH] refactor(logging): Do not double log and do not classify errors --- gateway/controller/land.go | 5 ---- orchestrator/controller/batch/BUILD.bazel | 1 - orchestrator/controller/batch/batch.go | 28 +------------------ orchestrator/controller/build/BUILD.bazel | 1 - orchestrator/controller/build/build.go | 15 +--------- .../controller/buildsignal/BUILD.bazel | 1 - .../controller/buildsignal/buildsignal.go | 15 +--------- orchestrator/controller/conclude/BUILD.bazel | 1 - orchestrator/controller/conclude/conclude.go | 27 ++---------------- .../controller/conclude/conclude_test.go | 4 +-- orchestrator/controller/merge/BUILD.bazel | 1 - orchestrator/controller/merge/merge.go | 21 ++------------ orchestrator/controller/score/BUILD.bazel | 1 - orchestrator/controller/score/score.go | 14 +--------- orchestrator/controller/speculate/BUILD.bazel | 1 - .../controller/speculate/speculate.go | 21 ++------------ orchestrator/controller/start/BUILD.bazel | 1 - orchestrator/controller/start/start.go | 20 ++----------- orchestrator/controller/start/start_test.go | 2 +- orchestrator/controller/validate/validate.go | 17 +---------- 20 files changed, 16 insertions(+), 181 deletions(-) diff --git a/gateway/controller/land.go b/gateway/controller/land.go index 1bbc82fc..fdaedab4 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -124,11 +124,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan // Publish to queue for async processing if err := c.publishToQueue(ctx, landRequest); err != nil { - c.logger.Errorw("failed to publish request to queue", - "queue", req.Queue, - "sqid", landRequest.ID, - "error", err, - ) return nil, fmt.Errorf("LandController failed to publish request to queue: %w", err) } diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index f5a824d3..5138cc41 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "//extension/counter", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 1f10522d..60767ba3 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" @@ -76,12 +75,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize request entity request, err := entity.RequestFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize request: %w", err) @@ -99,11 +92,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Generate a globally unique batch ID. seq, err := c.counter.Next(ctx, "batch/"+request.Queue) if err != nil { - c.logger.Errorw("failed to generate batch ID", - "request_id", request.ID, - "queue", request.Queue, - "error", err, - ) c.metricsScope.Counter("counter_errors").Inc(1) return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err) } @@ -123,11 +111,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er entity.BatchStateFinalizing, }) if err != nil { - c.logger.Errorw("failed to get active batches", - "request_id", request.ID, - "queue", request.Queue, - "error", err, - ) c.metricsScope.Counter("batch_store_errors").Inc(1) return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } @@ -150,10 +133,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID) if err != nil && !storage.IsNotFound(err) { - c.logger.Errorw("failed to get existing batch dependent", - "batch_id", dep.ID, - "error", err, - ) c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err) } @@ -175,13 +154,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to score topic if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { - c.logger.Errorw("failed to publish output", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyScore, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err)) + return fmt.Errorf("failed to publish to score: %w", err) } c.logger.Infow("published batch to score", diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index 85ea7175..a47bcb7e 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index 4a8ef2a9..eab2b780 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize batch entity batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize batch", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize batch: %w", err) @@ -100,14 +93,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish build to build signal topic if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { - c.logger.Errorw("failed to publish output", - "batch_id", batch.ID, - "build_id", build.ID, - "topic_key", consumer.TopicKeyBuildSignal, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to buildsignal: %w", err)) + return fmt.Errorf("failed to publish to buildsignal: %w", err) } c.logger.Infow("published build to buildsignal", diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index ed57d1a2..0ca992b7 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index 40bc52a3..95a0a686 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize build entity build, err := entity.BuildFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize build", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize build: %w", err) @@ -97,14 +90,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish batch to speculate topic if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { - c.logger.Errorw("failed to publish output", - "build_id", build.ID, - "batch_id", build.BatchID, - "topic_key", consumer.TopicKeySpeculate, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", diff --git a/orchestrator/controller/conclude/BUILD.bazel b/orchestrator/controller/conclude/BUILD.bazel index df2075f9..4e39e77c 100644 --- a/orchestrator/controller/conclude/BUILD.bazel +++ b/orchestrator/controller/conclude/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//core/metrics", "//entity", "//extension/storage", diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index e7aac4cc..f98b537f 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" @@ -73,12 +72,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Deserialize batch entity batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize batch", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) // Non-retryable: malformed messages will never succeed regardless of retry count metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch: %w", err) @@ -100,10 +93,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // as updated by the merge controller. requestState, err := batchStateToRequestState(batch.State) if err != nil { - c.logger.Errorw("unexpected batch state", - "batch_id", batch.ID, - "state", string(batch.State), - ) metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1) return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err) } @@ -112,25 +101,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { - c.logger.Errorw("failed to get request from storage", - "batch_id", batch.ID, - "request_id", requestID, - "error", err, - ) metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1) - return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", requestID, err)) + return fmt.Errorf("failed to get request %s: %w", requestID, err) } if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, requestState); err != nil { - c.logger.Errorw("failed to update request state", - "batch_id", batch.ID, - "request_id", requestID, - "from_version", request.Version, - "to_state", string(requestState), - "error", err, - ) metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1) - return errs.NewRetryableError(fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)) + return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err) } c.logger.Infow("updated request state", diff --git a/orchestrator/controller/conclude/conclude_test.go b/orchestrator/controller/conclude/conclude_test.go index 936b3ffc..7d1439ae 100644 --- a/orchestrator/controller/conclude/conclude_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -156,7 +156,7 @@ func TestController_Process(t *testing.T) { return mockStorage }, wantErr: true, - retryable: true, + retryable: false, }, { name: "request store update failure is retryable", @@ -179,7 +179,7 @@ func TestController_Process(t *testing.T) { return mockStorage }, wantErr: true, - retryable: true, + retryable: false, }, { name: "empty contains list succeeds", diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 3f2c16b5..f388af3f 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 1bfc0b53..7d1f1949 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize batch entity batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize batch", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize batch: %w", err) @@ -94,13 +87,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to conclude topic if err := c.publish(ctx, consumer.TopicKeyConclude, batch); err != nil { - c.logger.Errorw("failed to publish to conclude", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyConclude, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to conclude: %w", err)) + return fmt.Errorf("failed to publish to conclude: %w", err) } c.logger.Infow("published batch to conclude", @@ -110,13 +98,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to speculate topic if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { - c.logger.Errorw("failed to publish to speculate", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", diff --git a/orchestrator/controller/score/BUILD.bazel b/orchestrator/controller/score/BUILD.bazel index 43829396..525da050 100644 --- a/orchestrator/controller/score/BUILD.bazel +++ b/orchestrator/controller/score/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go index 7af25199..2750c795 100644 --- a/orchestrator/controller/score/score.go +++ b/orchestrator/controller/score/score.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize batch entity batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize batch", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize batch: %w", err) @@ -94,13 +87,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to speculate topic if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { - c.logger.Errorw("failed to publish output", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", diff --git a/orchestrator/controller/speculate/BUILD.bazel b/orchestrator/controller/speculate/BUILD.bazel index e2defe21..013ff592 100644 --- a/orchestrator/controller/speculate/BUILD.bazel +++ b/orchestrator/controller/speculate/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index f352cb87..7caf8b3c 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -20,7 +20,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize batch entity batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize batch", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize batch: %w", err) @@ -96,13 +89,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to build topic if err := c.publish(ctx, consumer.TopicKeyBuild, batch); err != nil { - c.logger.Errorw("failed to publish to build", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyBuild, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to build: %w", err)) + return fmt.Errorf("failed to publish to build: %w", err) } c.logger.Infow("published batch to build", @@ -112,13 +100,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to merge topic if err := c.publish(ctx, consumer.TopicKeyMerge, batch); err != nil { - c.logger.Errorw("failed to publish to merge", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyMerge, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to merge: %w", err)) + return fmt.Errorf("failed to publish to merge: %w", err) } c.logger.Infow("published batch to merge", diff --git a/orchestrator/controller/start/BUILD.bazel b/orchestrator/controller/start/BUILD.bazel index 4b42243d..416ec03a 100644 --- a/orchestrator/controller/start/BUILD.bazel +++ b/orchestrator/controller/start/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "//extension/storage", diff --git a/orchestrator/controller/start/start.go b/orchestrator/controller/start/start.go index cc29a87f..1bb3e584 100644 --- a/orchestrator/controller/start/start.go +++ b/orchestrator/controller/start/start.go @@ -21,7 +21,6 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/storage" @@ -73,12 +72,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize land request from gateway landRequest, err := entity.LandRequestFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize land request", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize land request: %w", err) @@ -108,12 +101,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Persist request to storage (idempotent — ErrAlreadyExists means a retry) if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { - c.logger.Errorw("failed to create request in storage", - "request_id", request.ID, - "error", err, - ) c.metricsScope.Counter("storage_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to create request: %w", err)) + return fmt.Errorf("failed to create request: %w", err) } // Record the "new" status in the request log @@ -127,13 +116,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to validate topic if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { - c.logger.Errorw("failed to publish output", - "request_id", request.ID, - "topic_key", consumer.TopicKeyValidate, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to validate: %w", err)) + return fmt.Errorf("failed to publish to validate: %w", err) } c.logger.Infow("published request to validate", diff --git a/orchestrator/controller/start/start_test.go b/orchestrator/controller/start/start_test.go index 78a9c56e..b294f09c 100644 --- a/orchestrator/controller/start/start_test.go +++ b/orchestrator/controller/start/start_test.go @@ -284,7 +284,7 @@ func TestController_Process_StorageFailure(t *testing.T) { err = controller.Process(context.Background(), delivery) require.Error(t, err) - assert.True(t, errs.IsRetryable(err)) + assert.False(t, errs.IsRetryable(err)) } func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { diff --git a/orchestrator/controller/validate/validate.go b/orchestrator/controller/validate/validate.go index 1278616f..e17fb009 100644 --- a/orchestrator/controller/validate/validate.go +++ b/orchestrator/controller/validate/validate.go @@ -73,12 +73,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Deserialize request entity request, err := entity.RequestFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize request: %w", err) @@ -96,10 +90,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Merge conflict check mergeResult, err := c.mergeChecker.Check(ctx, request.Queue, request.Change) if err != nil { - c.logger.Errorw("merge check failed", - "request_id", request.ID, - "error", err, - ) c.metricsScope.Counter("merge_check_errors").Inc(1) return fmt.Errorf("merge check failed: %w", err) } @@ -115,13 +105,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Publish to batch topic if err := c.publish(ctx, consumer.TopicKeyBatch, request); err != nil { - c.logger.Errorw("failed to publish output", - "request_id", request.ID, - "topic_key", consumer.TopicKeyBatch, - "error", err, - ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to batch: %w", err)) + return fmt.Errorf("failed to publish to batch: %w", err) } c.logger.Infow("published request to batch",