Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve rsources service table setup procedure #4165

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
username: rudderlabs
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Warehouse Service Integration [ ${{ matrix.destination }} ]
run: make test-warehouse package=./${{ matrix.package }}/...
run: make test-warehouse package=${{ matrix.package }}
env:
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
DATABRICKS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }}
Expand Down Expand Up @@ -123,8 +123,13 @@ jobs:
- regulation-worker
- router
- services
- services/rsources
- suppression-backup-service
- warehouse
include:
- package: services
exclude: services/rsources

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
Expand All @@ -143,7 +148,7 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }}
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
run: make test package=./${{ matrix.package }}/...
run: make test exclude="${{ matrix.exclude }}" package=${{ matrix.package }}
- name: Sanitize name for Artifact
run: |
name=$(echo -n "${{ matrix.package }}" | sed -e 's/[ \t:\/\\"<>|*?]/-/g' -e 's/--*/-/g')
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ else
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=15m)
endif
ifdef package
$(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true
ifdef exclude
$(eval FILES = `go list ./$(package)/... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
else
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true
endif
else ifdef exclude
$(eval FILES = `go list ./... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
Comment on lines 20 to 31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test command logic for handling package and exclude variables is correctly implemented. However, consider using $(shell ...) instead of backticks for command substitution to improve readability and maintainability in the Makefile.

Expand All @@ -32,7 +37,7 @@ test-warehouse-integration:
$(eval TEST_PATTERN = 'TestIntegration')
$(eval TEST_CMD = SLOW=1 go test)
$(eval TEST_OPTIONS = -v -p 8 -timeout 30m -count 1 -run $(TEST_PATTERN) -coverprofile=profile.out -covermode=atomic -coverpkg=./...)
$(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true

test-warehouse: test-warehouse-integration test-teardown

Expand Down
71 changes: 46 additions & 25 deletions services/rsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,26 +476,40 @@
return time.After(config.GetDuration("Rsources.stats.cleanup.interval", 1, time.Hour))
}
}
sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname)
if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil {

const lockID = 100020001

if err := withAdvisoryLock(ctx, sh.localDB, lockID, func(tx *sql.Tx) error {
sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname)
if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil {
return err
}

Check warning on line 486 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L485-L486

Added lines #L485 - L486 were not covered by tests
if err := migrateFailedKeysTable(ctx, tx); err != nil {
return fmt.Errorf("migrating rsources_failed_keys table: %w", err)
}

Check warning on line 489 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L488-L489

Added lines #L488 - L489 were not covered by tests
sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname)
return nil
}); err != nil {

Check warning on line 492 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L492

Added line #L492 was not covered by tests
return err
}
if err := migrateFailedKeysTable(ctx, sh.localDB); err != nil {
return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err)
}
sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname)

if sh.sharedDB != nil {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
return err
}
sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn)
if err := withAdvisoryLock(ctx, sh.sharedDB, lockID, func(_ *sql.Tx) error {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
return err
}

Check warning on line 501 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L500-L501

Added lines #L500 - L501 were not covered by tests
sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn)

sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname)
if err := sh.setupLogicalReplication(ctx); err != nil {
return fmt.Errorf("failed to setup rsources logical replication in %s: %w", sh.config.LocalHostname, err)
sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname)
if err := sh.setupLogicalReplication(ctx); err != nil {
return fmt.Errorf("logical replication in %q: %w", sh.config.LocalHostname, err)
}
sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname)
return nil
}); err != nil {
return err
}
sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname)
}
return nil
Comment on lines 476 to 514
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring the init function to break down the setup of local and shared databases into separate, smaller functions for improved readability and maintainability.

}
Expand All @@ -511,15 +525,7 @@
}

// TODO: Remove this after a few releases
func migrateFailedKeysTable(ctx context.Context, db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock(100020001)`); err != nil {
return fmt.Errorf("error while acquiring advisory lock for failed keys migration: %w", err)
}
func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
var previousTableExists bool
row := tx.QueryRowContext(ctx, `SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname NOT IN ('pg_catalog','information_schema') AND tablename = 'rsources_failed_keys')`)
if err := row.Scan(&previousTableExists); err != nil {
Expand Down Expand Up @@ -597,7 +603,7 @@
if _, err := tx.ExecContext(ctx, `drop function if exists ksuid()`); err != nil {
return fmt.Errorf("failed to drop ksuid function: %w", err)
}
return tx.Commit()
return nil
}

return nil
Expand Down Expand Up @@ -803,3 +809,18 @@
}
}
}

func withAdvisoryLock(ctx context.Context, db *sql.DB, lockId int64, f func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}

Check warning on line 817 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L816-L817

Added lines #L816 - L817 were not covered by tests
defer func() { _ = tx.Rollback() }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deferred rollback in withAdvisoryLock should be conditionally executed only if the commit has not been successful to avoid unnecessary rollbacks.

- defer func() { _ = tx.Rollback() }()
+ defer func() {
+   if r := recover(); r != nil || err != nil {
+     _ = tx.Rollback()
+   }
+ }()

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
defer func() { _ = tx.Rollback() }()
defer func() {
if r := recover(); r != nil || err != nil {
_ = tx.Rollback()
}
}()

if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock($1)`, lockId); err != nil {
return fmt.Errorf("acquiring advisory lock: %w", err)
}

Check warning on line 821 in services/rsources/handler.go

View check run for this annotation

Codecov / codecov/patch

services/rsources/handler.go#L820-L821

Added lines #L820 - L821 were not covered by tests
if err := f(tx); err != nil {
return err
}
return tx.Commit()
}
12 changes: 6 additions & 6 deletions services/rsources/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Using sources handler", func() {
Out: 4,
Failed: 6,
}
BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -65,7 +65,7 @@ var _ = Describe("Using sources handler", func() {
sh = createService(config)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, resource.resource)
})

Expand Down Expand Up @@ -655,7 +655,7 @@ var _ = Describe("Using sources handler", func() {
serviceA, serviceB JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -700,7 +700,7 @@ var _ = Describe("Using sources handler", func() {
serviceB = createService(configB)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down Expand Up @@ -888,7 +888,7 @@ var _ = Describe("Using sources handler", func() {
serviceA JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -918,7 +918,7 @@ var _ = Describe("Using sources handler", func() {
serviceA = createService(configA)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down
8 changes: 4 additions & 4 deletions services/rsources/handler_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("Using sources handler v1", func() {
Out: 4,
Failed: 6,
}
BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Comment on lines 31 to 37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This review was outside the patches, and no patch overlapping with it was found. Original lines [40-40]

The MaxPoolSize is set to 1 in the JobServiceConfig, which might not be sufficient for concurrent test execution. Consider increasing this value to ensure that there are enough connections available in the pool for the service to function correctly, especially during parallel test runs.


Note: This review was outside the patches, and no patch overlapping with it was found. Original lines [376-376]

The MaxPoolSize is set to 1 in the JobServiceConfig for the multitenant setup, which could lead to insufficient database connections during concurrent test execution. Consider increasing this value to ensure that there are enough connections available in the pool for the service to function correctly, especially during parallel test runs.

Expand All @@ -45,7 +45,7 @@ var _ = Describe("Using sources handler v1", func() {
sh = createService(config)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, resource.resource)
})

Expand Down Expand Up @@ -337,7 +337,7 @@ var _ = Describe("Using sources handler v1", func() {
serviceA, serviceB JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -382,7 +382,7 @@ var _ = Describe("Using sources handler v1", func() {
serviceB = createService(configB)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down
3 changes: 3 additions & 0 deletions services/rsources/rsources.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func NewJobService(config JobServiceConfig) (JobService, error) {
if config.Log == nil {
config.Log = logger.NewLogger().Child("rsources")
}
if config.MaxPoolSize <= 2 {
config.MaxPoolSize = 2 // minimum 2 connections in the pool for proper startup
}
Comment on lines +202 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that MaxPoolSize is not only at least 2, but also that it is not a negative value, which could cause unexpected behavior.

var (
localDB, sharedDB *sql.DB
err error
Comment on lines 199 to 207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider checking the error returned by sql.Open before setting MaxOpenConns on localDB and sharedDB to ensure that the database connections were successfully opened.

Expand Down
Loading