Skip to content

Commit

Permalink
chore: some minor chagnes
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 31, 2023
1 parent 598fab5 commit 59b73c1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
6 changes: 3 additions & 3 deletions warehouse/internal/repo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *Source) OnUpdateSuccess(ctx context.Context, id int64) error {
WHERE
id = $3;
`,
model.SourceJobStatusSucceeded,
model.SourceJobStatusSucceeded.String(),
s.now(),
id,
)
Expand Down Expand Up @@ -270,8 +270,8 @@ func (s *Source) OnUpdateFailure(ctx context.Context, id int64, error error, max
id = $6;
`,
maxAttempt,
model.SourceJobStatusAborted,
model.SourceJobStatusFailed,
model.SourceJobStatusAborted.String(),
model.SourceJobStatusFailed.String(),
s.now(),
error.Error(),
id,
Expand Down
20 changes: 13 additions & 7 deletions warehouse/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/lib/pq"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/services/notifier"
Expand Down Expand Up @@ -60,11 +62,8 @@ func New(conf *config.Config, log logger.Logger, db *sqlmw.DB, publisher publish
}

func (m *Manager) InsertJobs(ctx context.Context, payload insertJobRequest) ([]int64, error) {
var jobType model.SourceJobType
switch payload.JobType {
case model.SourceJobTypeDeleteByJobRunID.String():
jobType = model.SourceJobTypeDeleteByJobRunID
default:
jobType, err := model.FromSourceJobType(payload.JobType)
if err != nil {
return nil, fmt.Errorf("invalid job type %s", payload.JobType)
}

Check warning on line 68 in warehouse/source/source.go

View check run for this annotation

Codecov / codecov/patch

warehouse/source/source.go#L67-L68

Added lines #L67 - L68 were not covered by tests

Expand Down Expand Up @@ -127,8 +126,15 @@ func (m *Manager) Run(ctx context.Context) error {
return fmt.Errorf("resetting source jobs with error %w", err)
}

Check warning on line 127 in warehouse/source/source.go

View check run for this annotation

Codecov / codecov/patch

warehouse/source/source.go#L126-L127

Added lines #L126 - L127 were not covered by tests

if err := m.process(ctx); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("processing source jobs with error %w", err)
if err := m.process(ctx); err != nil {
var pqErr *pq.Error

switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.As(err, &pqErr) && pqErr.Code == "57014":
return nil
default:
return fmt.Errorf("processing source jobs with error %w", err)
}
}
return nil
}
Expand Down

0 comments on commit 59b73c1

Please sign in to comment.