From cfbeee972c9fa7a78890c01b69fd585f0894d212 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 2 Aug 2023 17:23:23 +0530 Subject: [PATCH] fix: send last error response for aborted jobs to reporting (#3692) --- router/router_test.go | 10 +++++----- router/worker.go | 10 ++++++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/router/router_test.go b/router/router_test.go index 51f13b6d07..84d2bb0f08 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -616,9 +616,9 @@ var _ = Describe("router", func() { }, Parameters: []byte(fmt.Sprintf(`{ "source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", - "destination_id": "%s", - "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", - "received_at": "%s", + "destination_id": "%s", + "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", + "received_at": "%s", "transform_at": "processor" }`, gaDestinationID, firstAttemptedAt.Add(-time.Minute).Format(misc.RFC3339Milli))), WorkspaceId: workspaceID, @@ -660,7 +660,7 @@ var _ = Describe("router", func() { c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1). Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) { Expect(drainList).To(HaveLen(1)) - assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, strconv.Itoa(routerUtils.DRAIN_ERROR_CODE), `{"reason": "retry limit reached"}`, jobs[0].LastJobStatus.AttemptNum) + assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, strconv.Itoa(routerUtils.DRAIN_ERROR_CODE), fmt.Sprintf(`{"reason": %s}`, fmt.Sprintf(`{"firstAttemptedAt": %q}`, firstAttemptedAt.Format(misc.RFC3339Milli))), jobs[0].LastJobStatus.AttemptNum) routerAborted = true }) @@ -1572,7 +1572,7 @@ func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState, if attemptNum >= 1 { Expect(gjson.GetBytes(status.ErrorResponse, "content-type").String()).To(Equal(gjson.Get(errorResponse, "content-type").String())) Expect(gjson.GetBytes(status.ErrorResponse, "response").String()).To(Equal(gjson.Get(errorResponse, "response").String())) - Expect(gjson.GetBytes(status.ErrorResponse, "reason").String()).To(Equal(gjson.Get(errorResponse, "reason").String())) + Expect(gjson.Get(string(status.ErrorResponse), "reason").String()).To(Equal(gjson.Get(errorResponse, "reason").String())) } Expect(status.ExecTime).To(BeTemporally("~", time.Now(), 10*time.Second)) Expect(status.RetryTime).To(BeTemporally(">=", status.ExecTime, 10*time.Second)) diff --git a/router/worker.go b/router/worker.go index c8c089b9ca..880ec50526 100644 --- a/router/worker.go +++ b/router/worker.go @@ -10,9 +10,10 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/samber/lo" "github.com/tidwall/gjson" - "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -89,11 +90,12 @@ func (w *worker) workLoop() { } w.rt.destinationsMapMu.RLock() abort, abortReason := routerutils.ToBeDrained(job, parameters.DestinationID, w.rt.reloadableConfig.toAbortDestinationIDs, w.rt.destinationsMap) + abortTag := abortReason w.rt.destinationsMapMu.RUnlock() - if !abort { abort = w.retryLimitReached(&job.LastJobStatus) - abortReason = "retry limit reached" + abortReason = string(job.LastJobStatus.ErrorResponse) + abortTag = "retry limit reached" } if abort { status := jobsdb.JobStatusT{ @@ -116,7 +118,7 @@ func (w *worker) workLoop() { "destType": w.rt.destType, "destId": parameters.DestinationID, "module": "router", - "reasons": abortReason, + "reasons": abortTag, "workspaceId": job.WorkspaceId, }).Count(1) continue