Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(warehouse): infinites retries #3050

Merged
merged 20 commits into from
Mar 10, 2023
Merged

fix(warehouse): infinites retries #3050

merged 20 commits into from
Mar 10, 2023

Conversation

achettyiitr
Copy link
Member

@achettyiitr achettyiitr commented Feb 28, 2023

Description

  • Since we already had a hard limit for x Hours and y Times we don’t need hasAllTablesSkipped
    • The problem with having hasAllTablesSkipped is it is being shared across multiple goroutines during exporting (regular tables, identifies/users tables, ID resolution)
    • With the bug retries causing succeeded tables to get retried again, during export, the oldest exporting goroutines set it to true in case of successful exports for that goroutine.
    • Later this is used to decide whether we should neglect to mark the upload status as aborted.
  • Once the failed sync is retried, all succeeded tables are getting retries.

Notion Ticket

https://www.notion.so/rudderstacks/infinite-retries-a61e259c0b9349bfb5485083298d6baf?pvs=4

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@achettyiitr achettyiitr marked this pull request as draft March 1, 2023 06:12
@achettyiitr achettyiitr marked this pull request as ready for review March 2, 2023 11:28
@codecov
Copy link

codecov bot commented Mar 2, 2023

Codecov Report

Patch coverage: 48.97% and project coverage change: +0.13 🎉

Comparison is base (1d9f63b) 53.43% compared to head (a8de1fe) 53.56%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3050      +/-   ##
==========================================
+ Coverage   53.43%   53.56%   +0.13%     
==========================================
  Files         342      346       +4     
  Lines       52747    53805    +1058     
==========================================
+ Hits        28185    28821     +636     
- Misses      22947    23349     +402     
- Partials     1615     1635      +20     
Impacted Files Coverage Δ
utils/misc/misc.go 51.37% <ø> (-0.49%) ⬇️
warehouse/identities.go 1.04% <0.00%> (-0.04%) ⬇️
warehouse/identity/identity.go 0.51% <0.00%> (+0.06%) ⬆️
...ehouse/integrations/azure-synapse/azure-synapse.go 0.31% <0.00%> (+0.02%) ⬆️
warehouse/integrations/postgres/postgres.go 0.53% <0.00%> (-28.10%) ⬇️
warehouse/internal/model/upload.go 100.00% <ø> (ø)
warehouse/internal/service/recovery.go 100.00% <ø> (ø)
warehouse/integrations/mssql/mssql.go 1.62% <6.20%> (+0.66%) ⬆️
warehouse/upload.go 20.13% <11.37%> (-3.47%) ⬇️
warehouse/integrations/postgres-legacy/postgres.go 28.62% <28.62%> (ø)
... and 11 more

... and 10 files with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

Comment on lines -966 to -968
// TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load
type TableSkipError struct {
tableName string
previousJobID int64
previousJobError string
}

func (tse *TableSkipError) Error() string {
return fmt.Sprintf("Skipping %s table because it previously failed to load in an earlier job: %d with error: %s", tse.tableName, tse.previousJobID, tse.previousJobError)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this as we are now avoiding TableSkipError.

@@ -6,20 +6,20 @@ import (
"fmt"
"os"
"testing"
time2 "time"

"github.com/aws/smithy-go/time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using this package for our tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me remove this. Not sure why we are using this.

@achettyiitr achettyiitr requested a review from lvrach March 7, 2023 13:45
@achettyiitr achettyiitr changed the title fix(warehouse): infinites retries fix(warehouse): infinites retries and table uploads repository Mar 7, 2023
@achettyiitr achettyiitr changed the title fix(warehouse): infinites retries and table uploads repository fix(warehouse): infinites retries Mar 8, 2023
panic(err)
// Previous upload and table upload failed
exportingfailedStatus = func(status string) bool {
return status == TableUploadExportingFailed || status == UserTableUploadExportingFailed || status == IdentityTableUploadExportingFailed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] & opinionated

Suggested change
return status == TableUploadExportingFailed || status == UserTableUploadExportingFailed || status == IdentityTableUploadExportingFailed
switch status {
case TableUploadExportingFailed, UserTableUploadExportingFailed, IdentityTableUploadExportingFailed:
return true
default:
return false
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving this logic outside of this function. Maybe it could be part of the table upload model, or introduce a separate failure flag.

The concern I have is when we introduce a new failed state we might forget to add it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the other 2 flags were not being used. Removing this.

Comment on lines 776 to 788
_, err := db.Exec(`INSERT INTO wh_uploads (id, destination_id, source_id, in_progress, destination_type, status, namespace, schema, created_at, updated_at)
VALUES
(1, 1, 1, true, 'RS', 'exporting_data', 'test_namespace', '{}', NOW(), NOW()),
(2, 1, 1, true, 'RS', 'aborted', 'test_namespace', '{}', NOW(), NOW())
`)
require.NoError(t, err)

_, err = db.Exec(`INSERT INTO wh_table_uploads (id, wh_upload_id, table_name, status, error, created_at, updated_at)
VALUES
(1, 1, 'test_table_1','exporting_data', '{}', NOW(), NOW()),
(2, 1, 'test_table_2','exporting_data_failed', 'error loading data', NOW(), NOW())
`)
require.NoError(t, err)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has be modified with repository in this #3087

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants