Skip to content

Commit

Permalink
fix: send last error response for aborted jobs to reporting (#3692)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Aug 2, 2023
1 parent fbd99c7 commit cfbeee9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
10 changes: 5 additions & 5 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,9 @@ var _ = Describe("router", func() {
},
Parameters: []byte(fmt.Sprintf(`{
"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77",
"destination_id": "%s",
"message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3",
"received_at": "%s",
"destination_id": "%s",
"message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3",
"received_at": "%s",
"transform_at": "processor"
}`, gaDestinationID, firstAttemptedAt.Add(-time.Minute).Format(misc.RFC3339Milli))),
WorkspaceId: workspaceID,
Expand Down Expand Up @@ -660,7 +660,7 @@ var _ = Describe("router", func() {
c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) {
Expect(drainList).To(HaveLen(1))
assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, strconv.Itoa(routerUtils.DRAIN_ERROR_CODE), `{"reason": "retry limit reached"}`, jobs[0].LastJobStatus.AttemptNum)
assertJobStatus(jobs[0], drainList[0], jobsdb.Aborted.State, strconv.Itoa(routerUtils.DRAIN_ERROR_CODE), fmt.Sprintf(`{"reason": %s}`, fmt.Sprintf(`{"firstAttemptedAt": %q}`, firstAttemptedAt.Format(misc.RFC3339Milli))), jobs[0].LastJobStatus.AttemptNum)
routerAborted = true
})

Expand Down Expand Up @@ -1572,7 +1572,7 @@ func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState,
if attemptNum >= 1 {
Expect(gjson.GetBytes(status.ErrorResponse, "content-type").String()).To(Equal(gjson.Get(errorResponse, "content-type").String()))
Expect(gjson.GetBytes(status.ErrorResponse, "response").String()).To(Equal(gjson.Get(errorResponse, "response").String()))
Expect(gjson.GetBytes(status.ErrorResponse, "reason").String()).To(Equal(gjson.Get(errorResponse, "reason").String()))
Expect(gjson.Get(string(status.ErrorResponse), "reason").String()).To(Equal(gjson.Get(errorResponse, "reason").String()))
}
Expect(status.ExecTime).To(BeTemporally("~", time.Now(), 10*time.Second))
Expect(status.RetryTime).To(BeTemporally(">=", status.ExecTime, 10*time.Second))
Expand Down
10 changes: 6 additions & 4 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"strings"
"time"

"golang.org/x/exp/slices"

"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -89,11 +90,12 @@ func (w *worker) workLoop() {
}
w.rt.destinationsMapMu.RLock()
abort, abortReason := routerutils.ToBeDrained(job, parameters.DestinationID, w.rt.reloadableConfig.toAbortDestinationIDs, w.rt.destinationsMap)
abortTag := abortReason
w.rt.destinationsMapMu.RUnlock()

if !abort {
abort = w.retryLimitReached(&job.LastJobStatus)
abortReason = "retry limit reached"
abortReason = string(job.LastJobStatus.ErrorResponse)
abortTag = "retry limit reached"
}
if abort {
status := jobsdb.JobStatusT{
Expand All @@ -116,7 +118,7 @@ func (w *worker) workLoop() {
"destType": w.rt.destType,
"destId": parameters.DestinationID,
"module": "router",
"reasons": abortReason,
"reasons": abortTag,
"workspaceId": job.WorkspaceId,
}).Count(1)
continue
Expand Down

0 comments on commit cfbeee9

Please sign in to comment.