Skip to content

Commit

Permalink
fix(warehouse): added check for nil warehouse manager during error ma…
Browse files Browse the repository at this point in the history
…pping (#2981)
  • Loading branch information
achettyiitr committed Feb 16, 2023
1 parent dfc2273 commit a258f74
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 30 deletions.
4 changes: 4 additions & 0 deletions warehouse/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type ErrorHandler struct {
// and returns the corresponding joins of the matched error type
// else returns UnknownError
func (e *ErrorHandler) MatchErrorMappings(err error) Tag {
if e.Manager == nil || err == nil {
return Tag{Name: "error_mapping", Value: string(model.Noop)}
}

var (
errMappings []string
errString = err.Error()
Expand Down
21 changes: 20 additions & 1 deletion warehouse/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestErrorHandler_MatchErrorMappings(t *testing.T) {
tag := er.MatchErrorMappings(errors.New(uploadError))
require.Equal(t, tag.Name, "error_mapping")
require.NotContains(t, tag.Value, string(model.UnknownError))
require.NotContains(t, tag.Value, string(model.Noop))
}
})

Expand All @@ -77,7 +78,25 @@ func TestErrorHandler_MatchErrorMappings(t *testing.T) {
er := &warehouse.ErrorHandler{Manager: m}
tag := er.MatchErrorMappings(errors.New("unknown error"))
require.Equal(t, tag.Name, "error_mapping")
require.Contains(t, tag.Value, string(model.UnknownError))
require.Equal(t, tag.Value, string(model.UnknownError))
})

t.Run("Nil manager: "+destType, func(t *testing.T) {
t.Parallel()

er := &warehouse.ErrorHandler{Manager: nil}
tag := er.MatchErrorMappings(errors.New("unknown error"))
require.Equal(t, tag.Name, "error_mapping")
require.Equal(t, tag.Value, string(model.Noop))
})

t.Run("Nil error: "+destType, func(t *testing.T) {
t.Parallel()

er := &warehouse.ErrorHandler{Manager: nil}
tag := er.MatchErrorMappings(errors.New("unknown error"))
require.Equal(t, tag.Name, "error_mapping")
require.Equal(t, tag.Value, string(model.Noop))
})
}
}
1 change: 1 addition & 0 deletions warehouse/internal/model/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
InsufficientResourceError JobErrorType = "insufficient_resource_error"
ConcurrentQueriesError JobErrorType = "concurrent_queries_error"
UnknownError JobErrorType = "unknown_error"
Noop JobErrorType = "noop"
)

var ErrUploadNotFound = errors.New("upload not found")
Expand Down
32 changes: 17 additions & 15 deletions warehouse/logfield/logfield.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package logfield

const (
UploadJobID = "uploadJobID"
UploadStatus = "uploadStatus"
UseRudderStorage = "useRudderStorage"
SourceID = "sourceID"
SourceType = "sourceType"
DestinationID = "destinationID"
DestinationType = "destinationType"
WorkspaceID = "workspaceID"
Namespace = "namespace"
Error = "error"
TableName = "tableName"
Priority = "priority"
Retried = "retried"
Attempt = "attempt"
LoadFileType = "loadFileType"
UploadJobID = "uploadJobID"
UploadStatus = "uploadStatus"
UseRudderStorage = "useRudderStorage"
SourceID = "sourceID"
SourceType = "sourceType"
DestinationID = "destinationID"
DestinationType = "destinationType"
WorkspaceID = "workspaceID"
Namespace = "namespace"
Error = "error"
TableName = "tableName"
Priority = "priority"
Retried = "retried"
Attempt = "attempt"
LoadFileType = "loadFileType"
ErrorMapping = "errorMapping"
DestinationCredsValid = "destinationCredsValid"
)
11 changes: 0 additions & 11 deletions warehouse/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,6 @@ func (job *UploadJobT) generateUploadAbortedMetrics() {
}

job.counterStat("num_staged_events").Count(int(numStagedEvents))

// Set the upload_aborted stat
attempts := job.getAttemptNumber()
tags := []Tag{{Name: "attempt_number", Value: strconv.Itoa(attempts)}}
valid, err := job.validateDestinationCredentials()
if err == nil {
// Only if error is nil, meaning we were able to
// successfully validate the creds, we set this tag
tags = append(tags, Tag{Name: "destination_creds_valid", Value: strconv.FormatBool(valid)})
}
job.counterStat("upload_aborted", tags...).Count(1)
}

func (job *UploadJobT) recordTableLoad(tableName string, numEvents int64) {
Expand Down
26 changes: 25 additions & 1 deletion warehouse/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var _ = Describe("Stats", Ordered, func() {
cleanup.Run()
})

Describe("Generate upload success/aborted metrics", func() {
Describe("Generate upload success metrics", func() {
var job *UploadJobT

BeforeEach(func() {
Expand All @@ -77,6 +77,30 @@ var _ = Describe("Stats", Ordered, func() {
It("Success metrics", func() {
job.generateUploadSuccessMetrics()
})
})

Describe("Generate upload aborted metrics", func() {
var job *UploadJobT

BeforeEach(func() {
mockStats, mockMeasurement := getMockStats(g)
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(mockMeasurement)
mockMeasurement.EXPECT().Count(4).Times(2)

job = &UploadJobT{
upload: model.Upload{
ID: uploadID,
StagingFileStartID: 1,
StagingFileEndID: 4,
SourceID: "test-sourceID",
DestinationID: "test-destinationID",
},
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
})

It("Aborted metrics", func() {
job.generateUploadAbortedMetrics()
Expand Down
10 changes: 8 additions & 2 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,11 @@ func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool {
}

func (job *UploadJobT) setUploadError(statusError error, state string) (string, error) {
var (
errorTags = job.ErrorHandler.MatchErrorMappings(statusError)
destCredentialsValidations *bool
)

defer func() {
pkgLogger.Warnw("upload error",
logfield.UploadJobID, job.upload.ID,
Expand All @@ -1609,6 +1614,8 @@ func (job *UploadJobT) setUploadError(statusError error, state string) (string,
logfield.Retried, job.upload.Retried,
logfield.Attempt, job.upload.Attempts,
logfield.LoadFileType, job.upload.LoadFileType,
logfield.ErrorMapping, errorTags.Value,
logfield.DestinationCredsValid, destCredentialsValidations,
)
}()

Expand Down Expand Up @@ -1734,14 +1741,13 @@ func (job *UploadJobT) setUploadError(statusError error, state string) (string,
if state == model.Aborted {
// base tag to be sent as stat

errorTags := job.ErrorHandler.MatchErrorMappings(statusError)

tags := []Tag{{Name: "attempt_number", Value: strconv.Itoa(attempts)}}
tags = append(tags, errorTags)

valid, err := job.validateDestinationCredentials()
if err == nil {
tags = append(tags, Tag{Name: "destination_creds_valid", Value: strconv.FormatBool(valid)})
destCredentialsValidations = &valid
}

job.counterStat("upload_aborted", tags...).Count(1)
Expand Down

0 comments on commit a258f74

Please sign in to comment.