Skip to content

Commit

Permalink
chore: improve rsources service table setup procedure (#4165)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 27, 2023
1 parent e803bf9 commit 42d5130
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 39 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
username: rudderlabs
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Warehouse Service Integration [ ${{ matrix.destination }} ]
run: make test-warehouse package=./${{ matrix.package }}/...
run: make test-warehouse package=${{ matrix.package }}
env:
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
DATABRICKS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }}
Expand Down Expand Up @@ -123,8 +123,13 @@ jobs:
- regulation-worker
- router
- services
- services/rsources
- suppression-backup-service
- warehouse
include:
- package: services
exclude: services/rsources

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
Expand All @@ -143,7 +148,7 @@ jobs:
TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }}
TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }}
BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }}
run: make test package=./${{ matrix.package }}/...
run: make test exclude="${{ matrix.exclude }}" package=${{ matrix.package }}
- name: Sanitize name for Artifact
run: |
name=$(echo -n "${{ matrix.package }}" | sed -e 's/[ \t:\/\\"<>|*?]/-/g' -e 's/--*/-/g')
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ else
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=15m)
endif
ifdef package
$(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true
ifdef exclude
$(eval FILES = `go list ./$(package)/... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
else
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true
endif
else ifdef exclude
$(eval FILES = `go list ./... | egrep -iv '$(exclude)'`)
$(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true
Expand All @@ -32,7 +37,7 @@ test-warehouse-integration:
$(eval TEST_PATTERN = 'TestIntegration')
$(eval TEST_CMD = SLOW=1 go test)
$(eval TEST_OPTIONS = -v -p 8 -timeout 30m -count 1 -run $(TEST_PATTERN) -coverprofile=profile.out -covermode=atomic -coverpkg=./...)
$(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true
$(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true

test-warehouse: test-warehouse-integration test-teardown

Expand Down
71 changes: 46 additions & 25 deletions services/rsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,26 +476,40 @@ func (sh *sourcesHandler) init() error {
return time.After(config.GetDuration("Rsources.stats.cleanup.interval", 1, time.Hour))
}
}
sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname)
if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil {

const lockID = 100020001

if err := withAdvisoryLock(ctx, sh.localDB, lockID, func(tx *sql.Tx) error {
sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname)
if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil {
return err
}
if err := migrateFailedKeysTable(ctx, tx); err != nil {
return fmt.Errorf("migrating rsources_failed_keys table: %w", err)
}
sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname)
return nil
}); err != nil {
return err
}
if err := migrateFailedKeysTable(ctx, sh.localDB); err != nil {
return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err)
}
sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname)

if sh.sharedDB != nil {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
return err
}
sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn)
if err := withAdvisoryLock(ctx, sh.sharedDB, lockID, func(_ *sql.Tx) error {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
return err
}
sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn)

sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname)
if err := sh.setupLogicalReplication(ctx); err != nil {
return fmt.Errorf("failed to setup rsources logical replication in %s: %w", sh.config.LocalHostname, err)
sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname)
if err := sh.setupLogicalReplication(ctx); err != nil {
return fmt.Errorf("logical replication in %q: %w", sh.config.LocalHostname, err)
}
sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname)
return nil
}); err != nil {
return err
}
sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname)
}
return nil
}
Expand All @@ -511,15 +525,7 @@ func setupTables(ctx context.Context, db *sql.DB, defaultDbName string, log logg
}

// TODO: Remove this after a few releases
func migrateFailedKeysTable(ctx context.Context, db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock(100020001)`); err != nil {
return fmt.Errorf("error while acquiring advisory lock for failed keys migration: %w", err)
}
func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
var previousTableExists bool
row := tx.QueryRowContext(ctx, `SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname NOT IN ('pg_catalog','information_schema') AND tablename = 'rsources_failed_keys')`)
if err := row.Scan(&previousTableExists); err != nil {
Expand Down Expand Up @@ -597,7 +603,7 @@ func migrateFailedKeysTable(ctx context.Context, db *sql.DB) error {
if _, err := tx.ExecContext(ctx, `drop function if exists ksuid()`); err != nil {
return fmt.Errorf("failed to drop ksuid function: %w", err)
}
return tx.Commit()
return nil
}

return nil
Expand Down Expand Up @@ -803,3 +809,18 @@ func (sh *sourcesHandler) Monitor(ctx context.Context, lagGauge, replicationSlot
}
}
}

func withAdvisoryLock(ctx context.Context, db *sql.DB, lockId int64, f func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock($1)`, lockId); err != nil {
return fmt.Errorf("acquiring advisory lock: %w", err)
}
if err := f(tx); err != nil {
return err
}
return tx.Commit()
}
12 changes: 6 additions & 6 deletions services/rsources/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Using sources handler", func() {
Out: 4,
Failed: 6,
}
BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -65,7 +65,7 @@ var _ = Describe("Using sources handler", func() {
sh = createService(config)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, resource.resource)
})

Expand Down Expand Up @@ -655,7 +655,7 @@ var _ = Describe("Using sources handler", func() {
serviceA, serviceB JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -700,7 +700,7 @@ var _ = Describe("Using sources handler", func() {
serviceB = createService(configB)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down Expand Up @@ -888,7 +888,7 @@ var _ = Describe("Using sources handler", func() {
serviceA JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -918,7 +918,7 @@ var _ = Describe("Using sources handler", func() {
serviceA = createService(configA)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down
8 changes: 4 additions & 4 deletions services/rsources/handler_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("Using sources handler v1", func() {
Out: 4,
Failed: 6,
}
BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -45,7 +45,7 @@ var _ = Describe("Using sources handler v1", func() {
sh = createService(config)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, resource.resource)
})

Expand Down Expand Up @@ -337,7 +337,7 @@ var _ = Describe("Using sources handler v1", func() {
serviceA, serviceB JobService
)

BeforeAll(func() {
BeforeEach(func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -382,7 +382,7 @@ var _ = Describe("Using sources handler v1", func() {
serviceB = createService(configB)
})

AfterAll(func() {
AfterEach(func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
Expand Down
3 changes: 3 additions & 0 deletions services/rsources/rsources.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func NewJobService(config JobServiceConfig) (JobService, error) {
if config.Log == nil {
config.Log = logger.NewLogger().Child("rsources")
}
if config.MaxPoolSize <= 2 {
config.MaxPoolSize = 2 // minimum 2 connections in the pool for proper startup
}
var (
localDB, sharedDB *sql.DB
err error
Expand Down

0 comments on commit 42d5130

Please sign in to comment.