diff --git a/warehouse/backend_config.go b/warehouse/backend_config.go index 03c2844c56b..85ed5cc4a0c 100644 --- a/warehouse/backend_config.go +++ b/warehouse/backend_config.go @@ -3,11 +3,13 @@ package warehouse import ( "context" "encoding/json" + "fmt" + "strings" "sync" - "github.com/samber/lo" + "github.com/rudderlabs/rudder-server/warehouse/logfield" - "golang.org/x/exp/slices" + "github.com/samber/lo" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" @@ -16,7 +18,6 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/internal/repo" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -101,7 +102,6 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War s.subscriptionsMu.Lock() defer s.subscriptionsMu.Unlock() - idx := len(s.subscriptions) ch := make(chan []model.Warehouse, 10) s.subscriptions = append(s.subscriptions, ch) @@ -119,7 +119,12 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War close(ch) - s.subscriptions = append(s.subscriptions[:idx], s.subscriptions[idx+1:]...) + for i, item := range s.subscriptions { + if item == ch { + s.subscriptions = append(s.subscriptions[:i], s.subscriptions[i+1:]...) + return + } + } }() return ch @@ -134,6 +139,7 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string] warehouses []model.Warehouse connectionFlags backendconfig.ConnectionFlags sourceIDsByWorkspace = make(map[string][]string) + connectionsMap = make(map[string]map[string]model.Warehouse) ) for workspaceID, wConfig := range data { @@ -147,17 +153,11 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string] sourceIDsByWorkspace[workspaceID] = append(sourceIDsByWorkspace[workspaceID], source.ID) for _, destination := range source.Destinations { - if _, ok := warehouseutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok { + if _, ok := whutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok { s.logger.Debugf("Not a warehouse destination, skipping %s", destination.DestinationDefinition.Name) continue } - wh := &HandleT{ - dbHandle: s.db, - whSchemaRepo: s.schema, - conf: s.conf, - destType: destination.DestinationDefinition.Name, - } if s.internalControlPlaneClient != nil { destination = s.attachSSHTunnellingInfo(ctx, destination) } @@ -166,41 +166,35 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string] Source: source, WorkspaceID: workspaceID, Destination: destination, - Type: wh.destType, - Namespace: wh.getNamespace(ctx, source, destination), - Identifier: whutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID), + Type: destination.DestinationDefinition.Name, + Namespace: s.namespace(ctx, source, destination), + Identifier: whutils.GetWarehouseIdentifier(destination.DestinationDefinition.Name, source.ID, destination.ID), } warehouses = append(warehouses, warehouse) - s.connectionsMapMu.Lock() - if _, ok := s.connectionsMap[destination.ID]; !ok { - s.connectionsMap[destination.ID] = make(map[string]model.Warehouse) + if _, ok := connectionsMap[destination.ID]; !ok { + connectionsMap[destination.ID] = make(map[string]model.Warehouse) } - s.connectionsMap[destination.ID][source.ID] = warehouse - s.connectionsMapMu.Unlock() + connectionsMap[destination.ID][source.ID] = warehouse if destination.Config["sslMode"] == "verify-ca" { if err := whutils.WriteSSLKeys(destination); err.IsError() { s.logger.Error(err.Error()) persistSSLFileErrorStat( - workspaceID, wh.destType, destination.Name, destination.ID, + workspaceID, destination.DestinationDefinition.Name, destination.Name, destination.ID, source.Name, source.ID, err.GetErrTag(), ) } } - - if whutils.IDResolutionEnabled() && slices.Contains(whutils.IdentityEnabledWarehouses, warehouse.Type) { - wh.setupIdentityTables(ctx, warehouse) - if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled { - // non-blocking populate historic identities - wh.populateHistoricIdentities(ctx, warehouse) - } - } } } } + s.connectionsMapMu.Lock() + s.connectionsMap = connectionsMap + s.connectionsMapMu.Unlock() + s.warehousesMu.Lock() s.warehouses = warehouses // TODO how is this used? because we are duplicating data s.warehousesMu.Unlock() @@ -222,6 +216,49 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string] } } +// namespace gives the namespace for the warehouse in the following order +// 1. user set name from destinationConfig +// 2. from existing record in wh_schemas with same source + dest combo +// 3. convert source name +func (s *backendConfigManager) namespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string { + destType := destination.DestinationDefinition.Name + destConfig := destination.Config + + if destType == whutils.CLICKHOUSE { + if database, ok := destConfig["database"].(string); ok { + return database + } + return "rudder" + } + + if destConfig["namespace"] != nil { + namespace, _ := destConfig["namespace"].(string) + if len(strings.TrimSpace(namespace)) > 0 { + return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, namespace)) + } + } + + namespacePrefix := s.conf.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", whutils.WHDestNameMap[destType]), "") + if namespacePrefix != "" { + return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name))) + } + + namespace, err := s.schema.GetNamespace(ctx, source.ID, destination.ID) + if err != nil { + s.logger.Errorw("getting namespace", + logfield.SourceID, source.ID, + logfield.DestinationID, destination.ID, + logfield.DestinationType, destination.DestinationDefinition.Name, + logfield.WorkspaceID, destination.WorkspaceID, + ) + return "" + } + if namespace == "" { + return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, source.Name)) + } + return namespace +} + func (s *backendConfigManager) IsInitialized() bool { select { case <-s.initialConfigFetched: @@ -275,7 +312,7 @@ func (s *backendConfigManager) attachSSHTunnellingInfo( upstream backendconfig.DestinationT, ) backendconfig.DestinationT { // at destination level, do we have tunnelling enabled. - if tunnelEnabled := warehouseutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled { + if tunnelEnabled := whutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled { return upstream } diff --git a/warehouse/backend_config_test.go b/warehouse/backend_config_test.go index fe787d2855e..801ac723e31 100644 --- a/warehouse/backend_config_test.go +++ b/warehouse/backend_config_test.go @@ -2,10 +2,14 @@ package warehouse import ( "context" + "fmt" "net/http" "net/http/httptest" + "os" "testing" + migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" + "github.com/golang/mock/gomock" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" @@ -104,9 +108,9 @@ func TestBackendConfigManager(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - bcm := newBackendConfigManager(c, db, mockBackendConfig, logger.NOP) - t.Run("Subscriptions", func(t *testing.T) { + bcm := newBackendConfigManager(c, db, mockBackendConfig, logger.NOP) + require.False(t, bcm.IsInitialized()) require.Equal(t, bcm.Connections(), map[string]map[string]model.Warehouse{}) @@ -181,6 +185,8 @@ func TestBackendConfigManager(t *testing.T) { }) t.Run("Tunnelling", func(t *testing.T) { + bcm := newBackendConfigManager(c, db, mockBackendConfig, logger.NOP) + testCases := []struct { name string input backendconfig.DestinationT @@ -244,4 +250,219 @@ func TestBackendConfigManager(t *testing.T) { }) } }) + + t.Run("Many subscribers", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + numSubscribers := 1000 + bcm := newBackendConfigManager(c, db, mockBackendConfig, logger.NOP) + subscriptionsChs := make([]<-chan []model.Warehouse, numSubscribers) + + for i := 0; i < numSubscribers; i++ { + subscriptionsChs[i] = bcm.Subscribe(ctx) + } + + go func() { + bcm.Start(ctx) + }() + + for i := 0; i < numSubscribers; i++ { + require.Len(t, <-subscriptionsChs[i], 1) + } + + cancel() + + for i := 0; i < numSubscribers; i++ { + w, ok := <-subscriptionsChs[i] + require.Nil(t, w) + require.False(t, ok) + } + }) +} + +func TestBackendConfigManager_Namespace(t *testing.T) { + testcases := []struct { + config map[string]interface{} + source backendconfig.SourceT + destination backendconfig.DestinationT + expectedNamespace string + setConfig bool + }{ + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{ + "database": "test_db", + }, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: warehouseutils.CLICKHOUSE, + }, + }, + expectedNamespace: "test_db", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{}, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: warehouseutils.CLICKHOUSE, + }, + }, + expectedNamespace: "rudder", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{ + "namespace": "test_namespace", + }, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "test_namespace", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{ + "namespace": " test_namespace ", + }, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "test_namespace", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{ + "namespace": "##", + }, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "stringempty", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{ + "namespace": "##evrnvrv$vtr&^", + }, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "evrnvrv_vtr", + setConfig: false, + }, + { + source: backendconfig.SourceT{}, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{}, + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "config_result", + setConfig: true, + }, + { + source: backendconfig.SourceT{ + Name: "test-source", + ID: "test-sourceID", + }, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{}, + ID: "test-destinationID", + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "test-namespace", + setConfig: false, + }, + { + source: backendconfig.SourceT{ + Name: "test-source", + ID: "random-sourceID", + }, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{}, + ID: "random-destinationID", + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "test_source", + setConfig: false, + }, + { + source: backendconfig.SourceT{ + Name: "test-source", + ID: "test-sourceID", + }, + destination: backendconfig.DestinationT{ + Config: map[string]interface{}{}, + ID: "test-destinationID", + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "test-destinationType-1", + }, + }, + expectedNamespace: "config_result_test_source", + setConfig: true, + }, + } + + for _, tc := range testcases { + tc := tc + + t.Run("should return namespace", func(t *testing.T) { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pgResource, err := resource.SetupPostgres(pool, t) + require.NoError(t, err) + + db := sqlquerywrapper.New( + pgResource.DB, + sqlquerywrapper.WithLogger(logger.NOP), + ) + + err = (&migrator.Migrator{ + Handle: db.DB, + MigrationsTable: "wh_schema_migrations", + }).Migrate("warehouse") + require.NoError(t, err) + + c := config.New() + + if tc.setConfig { + c.Set(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[tc.destination.DestinationDefinition.Name]), "config_result") + } + + sqlStatement, err := os.ReadFile("testdata/sql/namespace_test.sql") + require.NoError(t, err) + + _, err = pgResource.DB.Exec(string(sqlStatement)) + require.NoError(t, err) + + bcm := newBackendConfigManager( + c, db, + mocksBackendConfig.NewMockBackendConfig(gomock.NewController(t)), + logger.NOP, + ) + + namespace := bcm.namespace(context.Background(), tc.source, tc.destination) + require.Equal(t, tc.expectedNamespace, namespace) + }) + } } diff --git a/warehouse/warehouse.go b/warehouse/warehouse.go index 89880bd03b1..b9172053bc6 100644 --- a/warehouse/warehouse.go +++ b/warehouse/warehouse.go @@ -51,7 +51,6 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/internal/repo" "github.com/rudderlabs/rudder-server/warehouse/internal/service" "github.com/rudderlabs/rudder-server/warehouse/jobs" - "github.com/rudderlabs/rudder-server/warehouse/logfield" "github.com/rudderlabs/rudder-server/warehouse/multitenant" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" "github.com/rudderlabs/rudder-server/warehouse/validations" @@ -260,6 +259,16 @@ func (wh *HandleT) backendConfigSubscriber(ctx context.Context) { return warehouse.Destination.DestinationDefinition.Name == wh.destType }) + for _, warehouse := range warehouses { + if warehouseutils.IDResolutionEnabled() && slices.Contains(warehouseutils.IdentityEnabledWarehouses, wh.destType) { + wh.setupIdentityTables(ctx, warehouse) + if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled { + // non-blocking populate historic identities + wh.populateHistoricIdentities(ctx, warehouse) + } + } + } + wh.configSubscriberLock.Lock() wh.warehouses = warehouses if wh.workspaceBySourceIDs == nil { @@ -275,10 +284,6 @@ func (wh *HandleT) backendConfigSubscriber(ctx context.Context) { wh.workerChannelMap = make(map[string]chan *UploadJob) } for _, warehouse := range warehouses { - if warehouse.Destination.DestinationDefinition.Name != wh.destType { - continue - } - workerName := wh.workerIdentifier(warehouse) // spawn one worker for each unique destID_namespace // check this commit to https://github.com/rudderlabs/rudder-server/pull/476/commits/fbfddf167aa9fc63485fe006d34e6881f5019667 @@ -292,46 +297,6 @@ func (wh *HandleT) backendConfigSubscriber(ctx context.Context) { } } -// getNamespace sets namespace name in the following order -// 1. user set name from destinationConfig -// 2. from existing record in wh_schemas with same source + dest combo -// 3. convert source name -func (wh *HandleT) getNamespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string { - configMap := destination.Config - if wh.destType == warehouseutils.CLICKHOUSE { - if _, ok := configMap["database"].(string); ok { - return configMap["database"].(string) - } - return "rudder" - } - if configMap["namespace"] != nil { - namespace, _ := configMap["namespace"].(string) - if len(strings.TrimSpace(namespace)) > 0 { - return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, namespace)) - } - } - // TODO: Move config to global level based on use case - namespacePrefix := wh.conf.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[wh.destType]), "") - if namespacePrefix != "" { - return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name))) - } - - namespace, err := wh.whSchemaRepo.GetNamespace(ctx, source.ID, destination.ID) - if err != nil { - pkgLogger.Errorw("getting namespace", - logfield.SourceID, source.ID, - logfield.DestinationID, destination.ID, - logfield.DestinationType, destination.DestinationDefinition.Name, - logfield.WorkspaceID, destination.WorkspaceID, - ) - return "" - } - if namespace == "" { - return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, source.Name)) - } - return namespace -} - func (wh *HandleT) setDestInProgress(warehouse model.Warehouse, jobID int64) { identifier := wh.workerIdentifier(warehouse) wh.inProgressMapLock.Lock() @@ -866,9 +831,7 @@ func onConfigDataEvent(ctx context.Context, config map[string]backendconfig.Conf pkgLogger.Debug("Got config from config-backend", config) enabledDestinations := make(map[string]bool) - var connectionFlags backendconfig.ConnectionFlags for _, wConfig := range config { - connectionFlags = wConfig.ConnectionFlags // the last connection flags should be enough, since they are all the same in multi-workspace environments for _, source := range wConfig.Sources { for _, destination := range source.Destinations { enabledDestinations[destination.DestinationDefinition.Name] = true @@ -889,11 +852,6 @@ func onConfigDataEvent(ctx context.Context, config map[string]backendconfig.Conf } } } - if val, ok := connectionFlags.Services["warehouse"]; ok { - if UploadAPI.connectionManager != nil { - UploadAPI.connectionManager.Apply(connectionFlags.URL, val) - } - } keys := lo.Keys(dstToWhRouter) for _, key := range keys { @@ -1447,12 +1405,15 @@ func Start(ctx context.Context, app app.App) error { } }() + g, ctx := errgroup.WithContext(ctx) + bcManager = newBackendConfigManager( config.Default, wrappedDBHandle, backendconfig.DefaultBackendConfig, pkgLogger.Child("wh_bc_manager"), ) - rruntime.GoForWarehouse(func() { + g.Go(func() error { bcManager.Start(ctx) + return nil }) RegisterAdmin(bcManager, pkgLogger) @@ -1476,8 +1437,6 @@ func Start(ctx context.Context, app app.App) error { return fmt.Errorf("cannot setup pgnotifier: %w", err) } - g, ctx := errgroup.WithContext(ctx) - // Setting up reporting client // only if standalone or embedded connecting to diff DB for warehouse if (isStandAlone() && isMaster()) || (misc.GetConnectionString() != psqlInfo) { diff --git a/warehouse/warehouse_test.go b/warehouse/warehouse_test.go index 7c1e487234f..92b2e278b19 100644 --- a/warehouse/warehouse_test.go +++ b/warehouse/warehouse_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/repo" @@ -191,173 +190,3 @@ func TestUploadJob_ProcessingStats(t *testing.T) { }) } } - -func Test_GetNamespace(t *testing.T) { - t.Parallel() - - testcases := []struct { - config map[string]interface{} - source backendconfig.SourceT - destination backendconfig.DestinationT - destType string - result string - setConfig bool - }{ - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "database": "test_db", - }, - }, - destType: warehouseutils.CLICKHOUSE, - result: "test_db", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{}, - }, - destType: warehouseutils.CLICKHOUSE, - result: "rudder", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "namespace": "test_namespace", - }, - }, - destType: "test-destinationType-1", - result: "test_namespace", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "namespace": " test_namespace ", - }, - }, - destType: "test-destinationType-1", - result: "test_namespace", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "namespace": "##", - }, - }, - destType: "test-destinationType-1", - result: "stringempty", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "namespace": "##evrnvrv$vtr&^", - }, - }, - destType: "test-destinationType-1", - result: "evrnvrv_vtr", - setConfig: false, - }, - { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{}, - }, - destType: "test-destinationType-1", - result: "config_result", - setConfig: true, - }, - { - source: backendconfig.SourceT{ - Name: "test-source", - ID: "test-sourceID", - }, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{}, - ID: "test-destinationID", - }, - destType: "test-destinationType-1", - result: "test-namespace", - setConfig: false, - }, - { - source: backendconfig.SourceT{ - Name: "test-source", - ID: "random-sourceID", - }, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{}, - ID: "random-destinationID", - }, - destType: "test-destinationType-1", - result: "test_source", - setConfig: false, - }, - { - source: backendconfig.SourceT{ - Name: "test-source", - ID: "test-sourceID", - }, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{}, - ID: "test-destinationID", - }, - destType: "test-destinationType-1", - result: "config_result_test_source", - setConfig: true, - }, - } - - for _, tc := range testcases { - tc := tc - t.Run("should return namespace", func(t *testing.T) { - t.Parallel() - - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - pgResource, err := resource.SetupPostgres(pool, t) - require.NoError(t, err) - - t.Log("db:", pgResource.DBDsn) - - err = (&migrator.Migrator{ - Handle: pgResource.DB, - MigrationsTable: "wh_schema_migrations", - }).Migrate("warehouse") - require.NoError(t, err) - - conf := config.New() - store := memstats.New() - - wh := HandleT{ - destType: tc.destType, - stats: store, - dbHandle: sqlquerywrapper.New(pgResource.DB), - whSchemaRepo: repo.NewWHSchemas(sqlquerywrapper.New(pgResource.DB)), - conf: conf, - } - if tc.setConfig { - conf.Set(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[tc.destType]), "config_result") - } - - sqlStatement, err := os.ReadFile("testdata/sql/namespace_test.sql") - require.NoError(t, err) - - _, err = pgResource.DB.Exec(string(sqlStatement)) - require.NoError(t, err) - - namespace := wh.getNamespace(context.Background(), tc.source, tc.destination) - require.Equal(t, tc.result, namespace) - }) - } -}