diff --git a/warehouse/internal/repo/source.go b/warehouse/internal/repo/source.go index ebcc882880..fc2cac0b3c 100644 --- a/warehouse/internal/repo/source.go +++ b/warehouse/internal/repo/source.go @@ -237,7 +237,7 @@ func (s *Source) OnUpdateSuccess(ctx context.Context, id int64) error { WHERE id = $3; `, - model.SourceJobStatusSucceeded, + model.SourceJobStatusSucceeded.String(), s.now(), id, ) @@ -270,8 +270,8 @@ func (s *Source) OnUpdateFailure(ctx context.Context, id int64, error error, max id = $6; `, maxAttempt, - model.SourceJobStatusAborted, - model.SourceJobStatusFailed, + model.SourceJobStatusAborted.String(), + model.SourceJobStatusFailed.String(), s.now(), error.Error(), id, diff --git a/warehouse/source/source.go b/warehouse/source/source.go index 3723105c0c..4d0a11488b 100644 --- a/warehouse/source/source.go +++ b/warehouse/source/source.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/lib/pq" + "github.com/samber/lo" "github.com/rudderlabs/rudder-server/services/notifier" @@ -60,11 +62,8 @@ func New(conf *config.Config, log logger.Logger, db *sqlmw.DB, publisher publish } func (m *Manager) InsertJobs(ctx context.Context, payload insertJobRequest) ([]int64, error) { - var jobType model.SourceJobType - switch payload.JobType { - case model.SourceJobTypeDeleteByJobRunID.String(): - jobType = model.SourceJobTypeDeleteByJobRunID - default: + jobType, err := model.FromSourceJobType(payload.JobType) + if err != nil { return nil, fmt.Errorf("invalid job type %s", payload.JobType) } @@ -127,8 +126,15 @@ func (m *Manager) Run(ctx context.Context) error { return fmt.Errorf("resetting source jobs with error %w", err) } - if err := m.process(ctx); err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("processing source jobs with error %w", err) + if err := m.process(ctx); err != nil { + var pqErr *pq.Error + + switch { + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.As(err, &pqErr) && pqErr.Code == "57014": + return nil + default: + return fmt.Errorf("processing source jobs with error %w", err) + } } return nil }