diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a7ce42d63c..2bcbf51b65 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -76,7 +76,7 @@ jobs: username: rudderlabs password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Warehouse Service Integration [ ${{ matrix.destination }} ] - run: make test-warehouse package=./${{ matrix.package }}/... + run: make test-warehouse package=${{ matrix.package }} env: BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }} DATABRICKS_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.DATABRICKS_INTEGRATION_TEST_CREDENTIALS }} @@ -123,8 +123,13 @@ jobs: - regulation-worker - router - services + - services/rsources - suppression-backup-service - warehouse + include: + - package: services + exclude: services/rsources + steps: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 @@ -143,7 +148,7 @@ jobs: TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING: ${{ secrets.TEST_KAFKA_AZURE_EVENT_HUBS_CLOUD_CONNECTION_STRING }} TEST_S3_DATALAKE_CREDENTIALS: ${{ secrets.TEST_S3_DATALAKE_CREDENTIALS }} BIGQUERY_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDENTIALS }} - run: make test package=./${{ matrix.package }}/... + run: make test exclude="${{ matrix.exclude }}" package=${{ matrix.package }} - name: Sanitize name for Artifact run: | name=$(echo -n "${{ matrix.package }}" | sed -e 's/[ \t:\/\\"<>|*?]/-/g' -e 's/--*/-/g') diff --git a/Makefile b/Makefile index 00d30b349f..10a5c08c40 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,12 @@ else $(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=15m) endif ifdef package - $(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true +ifdef exclude + $(eval FILES = `go list ./$(package)/... | egrep -iv '$(exclude)'`) + $(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true +else + $(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true +endif else ifdef exclude $(eval FILES = `go list ./... | egrep -iv '$(exclude)'`) $(TEST_CMD) -count=1 $(TEST_OPTIONS) $(FILES) && touch $(TESTFILE) || true @@ -32,7 +37,7 @@ test-warehouse-integration: $(eval TEST_PATTERN = 'TestIntegration') $(eval TEST_CMD = SLOW=1 go test) $(eval TEST_OPTIONS = -v -p 8 -timeout 30m -count 1 -run $(TEST_PATTERN) -coverprofile=profile.out -covermode=atomic -coverpkg=./...) - $(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true + $(TEST_CMD) $(TEST_OPTIONS) ./$(package)/... && touch $(TESTFILE) || true test-warehouse: test-warehouse-integration test-teardown diff --git a/services/rsources/handler.go b/services/rsources/handler.go index 2877791e0c..3ee9967c57 100644 --- a/services/rsources/handler.go +++ b/services/rsources/handler.go @@ -476,26 +476,40 @@ func (sh *sourcesHandler) init() error { return time.After(config.GetDuration("Rsources.stats.cleanup.interval", 1, time.Hour)) } } - sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname) - if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil { + + const lockId = 100020001 + + if err := withAdvisoryLock(ctx, sh.localDB, lockId, func(tx *sql.Tx) error { + sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname) + if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil { + return err + } + if err := migrateFailedKeysTable(ctx, tx); err != nil { + return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err) + } + sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname) + return nil + }); err != nil { return err } - if err := migrateFailedKeysTable(ctx, sh.localDB); err != nil { - return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err) - } - sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname) + if sh.sharedDB != nil { - sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn) - if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil { - return err - } - sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn) + if err := withAdvisoryLock(ctx, sh.sharedDB, lockId, func(_ *sql.Tx) error { + sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn) + if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil { + return err + } + sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn) - sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname) - if err := sh.setupLogicalReplication(ctx); err != nil { - return fmt.Errorf("failed to setup rsources logical replication in %s: %w", sh.config.LocalHostname, err) + sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname) + if err := sh.setupLogicalReplication(ctx); err != nil { + return fmt.Errorf("failed to setup rsources logical replication in %s: %w", sh.config.LocalHostname, err) + } + sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname) + return nil + }); err != nil { + return err } - sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname) } return nil } @@ -511,15 +525,7 @@ func setupTables(ctx context.Context, db *sql.DB, defaultDbName string, log logg } // TODO: Remove this after a few releases -func migrateFailedKeysTable(ctx context.Context, db *sql.DB) error { - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("failed to start transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock(100020001)`); err != nil { - return fmt.Errorf("error while acquiring advisory lock for failed keys migration: %w", err) - } +func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error { var previousTableExists bool row := tx.QueryRowContext(ctx, `SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname NOT IN ('pg_catalog','information_schema') AND tablename = 'rsources_failed_keys')`) if err := row.Scan(&previousTableExists); err != nil { @@ -597,7 +603,7 @@ func migrateFailedKeysTable(ctx context.Context, db *sql.DB) error { if _, err := tx.ExecContext(ctx, `drop function if exists ksuid()`); err != nil { return fmt.Errorf("failed to drop ksuid function: %w", err) } - return tx.Commit() + return nil } return nil @@ -803,3 +809,18 @@ func (sh *sourcesHandler) Monitor(ctx context.Context, lagGauge, replicationSlot } } } + +func withAdvisoryLock(ctx context.Context, db *sql.DB, lockId int64, f func(tx *sql.Tx) error) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + if _, err := tx.ExecContext(ctx, `SELECT pg_advisory_xact_lock($1)`, lockId); err != nil { + return fmt.Errorf("error while acquiring advisory lock: %w", err) + } + if err := f(tx); err != nil { + return err + } + return tx.Commit() +} diff --git a/services/rsources/handler_test.go b/services/rsources/handler_test.go index d8a6ce0698..be53817310 100644 --- a/services/rsources/handler_test.go +++ b/services/rsources/handler_test.go @@ -51,7 +51,7 @@ var _ = Describe("Using sources handler", func() { Out: 4, Failed: 6, } - BeforeAll(func() { + BeforeEach(func() { var err error pool, err = dockertest.NewPool("") Expect(err).NotTo(HaveOccurred()) @@ -65,7 +65,7 @@ var _ = Describe("Using sources handler", func() { sh = createService(config) }) - AfterAll(func() { + AfterEach(func() { purgeResources(pool, resource.resource) }) @@ -655,7 +655,7 @@ var _ = Describe("Using sources handler", func() { serviceA, serviceB JobService ) - BeforeAll(func() { + BeforeEach(func() { var err error pool, err = dockertest.NewPool("") Expect(err).NotTo(HaveOccurred()) @@ -700,7 +700,7 @@ var _ = Describe("Using sources handler", func() { serviceB = createService(configB) }) - AfterAll(func() { + AfterEach(func() { purgeResources(pool, pgA.resource, pgB.resource, pgC.resource) if network != nil { _ = pool.Client.RemoveNetwork(network.ID) @@ -888,7 +888,7 @@ var _ = Describe("Using sources handler", func() { serviceA JobService ) - BeforeAll(func() { + BeforeEach(func() { var err error pool, err = dockertest.NewPool("") Expect(err).NotTo(HaveOccurred()) @@ -918,7 +918,7 @@ var _ = Describe("Using sources handler", func() { serviceA = createService(configA) }) - AfterAll(func() { + AfterEach(func() { purgeResources(pool, pgA.resource, pgB.resource) if network != nil { _ = pool.Client.RemoveNetwork(network.ID) diff --git a/services/rsources/handler_v1_test.go b/services/rsources/handler_v1_test.go index f648f19f50..b4dc9c7ee1 100644 --- a/services/rsources/handler_v1_test.go +++ b/services/rsources/handler_v1_test.go @@ -31,7 +31,7 @@ var _ = Describe("Using sources handler v1", func() { Out: 4, Failed: 6, } - BeforeAll(func() { + BeforeEach(func() { var err error pool, err = dockertest.NewPool("") Expect(err).NotTo(HaveOccurred()) @@ -45,7 +45,7 @@ var _ = Describe("Using sources handler v1", func() { sh = createService(config) }) - AfterAll(func() { + AfterEach(func() { purgeResources(pool, resource.resource) }) @@ -337,7 +337,7 @@ var _ = Describe("Using sources handler v1", func() { serviceA, serviceB JobService ) - BeforeAll(func() { + BeforeEach(func() { var err error pool, err = dockertest.NewPool("") Expect(err).NotTo(HaveOccurred()) @@ -382,7 +382,7 @@ var _ = Describe("Using sources handler v1", func() { serviceB = createService(configB) }) - AfterAll(func() { + AfterEach(func() { purgeResources(pool, pgA.resource, pgB.resource, pgC.resource) if network != nil { _ = pool.Client.RemoveNetwork(network.ID) diff --git a/services/rsources/rsources.go b/services/rsources/rsources.go index 2c9b3d25b2..0b0d35f024 100644 --- a/services/rsources/rsources.go +++ b/services/rsources/rsources.go @@ -199,6 +199,9 @@ func NewJobService(config JobServiceConfig) (JobService, error) { if config.Log == nil { config.Log = logger.NewLogger().Child("rsources") } + if config.MaxPoolSize <= 2 { + config.MaxPoolSize = 2 // minimum 2 connections in the pool for proper startup + } var ( localDB, sharedDB *sql.DB err error