Skip to content

Commit

Permalink
chore: fix namespace bug (#3110)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Mar 17, 2023
1 parent 0d9af35 commit 7b6fa35
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 11 deletions.
6 changes: 6 additions & 0 deletions warehouse/testdata/sql/namespace_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;
INSERT INTO wh_schemas(id,wh_upload_id, source_id, namespace, destination_id, destination_type,
schema, error, created_at, updated_at)
VALUES (1,1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES','{}', NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685');
COMMIT;
26 changes: 15 additions & 11 deletions warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (wh *HandleT) backendConfigSubscriber(ctx context.Context) {
destination = wh.attachSSHTunnellingInfo(ctx, destination)
}

namespace := wh.getNamespace(destination.Config, source, destination, wh.destType)
namespace := wh.getNamespace(source, destination)
warehouse := model.Warehouse{
WorkspaceID: workspaceID,
Source: source,
Expand Down Expand Up @@ -400,9 +400,9 @@ func deepCopy(src, dest interface{}) error {
// 1. user set name from destinationConfig
// 2. from existing record in wh_schemas with same source + dest combo
// 3. convert source name
func (wh *HandleT) getNamespace(configI interface{}, source backendconfig.SourceT, destination backendconfig.DestinationT, destType string) string {
configMap := configI.(map[string]interface{})
if destType == warehouseutils.CLICKHOUSE {
func (wh *HandleT) getNamespace(source backendconfig.SourceT, destination backendconfig.DestinationT) string {
configMap := destination.Config
if wh.destType == warehouseutils.CLICKHOUSE {
if _, ok := configMap["database"].(string); ok {
return configMap["database"].(string)
}
Expand All @@ -411,18 +411,22 @@ func (wh *HandleT) getNamespace(configI interface{}, source backendconfig.Source
if configMap["namespace"] != nil {
namespace, _ := configMap["namespace"].(string)
if len(strings.TrimSpace(namespace)) > 0 {
return warehouseutils.ToProviderCase(destType, warehouseutils.ToSafeNamespace(destType, namespace))
return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, namespace))
}
}
// TODO: Move config to global level based on use case
namespacePrefix := config.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[destType]), "")
namespacePrefix := config.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[wh.destType]), "")
if namespacePrefix != "" {
return warehouseutils.ToProviderCase(destType, warehouseutils.ToSafeNamespace(destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name)))
return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name)))
}
if _, exists := warehouseutils.GetNamespace(source, destination, wh.dbHandle); !exists {
return warehouseutils.ToProviderCase(destType, warehouseutils.ToSafeNamespace(destType, source.Name))
var (
namespace string
exists bool
)
if namespace, exists = warehouseutils.GetNamespace(source, destination, wh.dbHandle); !exists {
return warehouseutils.ToProviderCase(wh.destType, warehouseutils.ToSafeNamespace(wh.destType, source.Name))
}
return ""
return namespace
}

func (wh *HandleT) setDestInProgress(warehouse model.Warehouse, jobID int64) {
Expand Down Expand Up @@ -968,7 +972,7 @@ func minimalConfigSubscriber() {
dbHandle: dbHandle,
destType: destination.DestinationDefinition.Name,
}
namespace := wh.getNamespace(destination.Config, source, destination, wh.destType)
namespace := wh.getNamespace(source, destination)
connectionsMapLock.Lock()
if connectionsMap[destination.ID] == nil {
connectionsMap[destination.ID] = map[string]model.Warehouse{}
Expand Down
164 changes: 164 additions & 0 deletions warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
postgreslegacy "github.com/rudderlabs/rudder-server/warehouse/integrations/postgres-legacy"

"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
Expand Down Expand Up @@ -187,3 +188,166 @@ func TestUploadJob_ProcessingStats(t *testing.T) {
})
}
}

func Test_GetNamespace(t *testing.T) {
testcases := []struct {
config map[string]interface{}
source backendconfig.SourceT
destination backendconfig.DestinationT
destType string
result string
setConfig bool
}{
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"database": "test_db",
},
},
destType: warehouseutils.CLICKHOUSE,
result: "test_db",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
},
destType: warehouseutils.CLICKHOUSE,
result: "rudder",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"namespace": "test_namespace",
},
},
destType: "test-destinationType-1",
result: "test_namespace",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"namespace": " test_namespace ",
},
},
destType: "test-destinationType-1",
result: "test_namespace",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"namespace": "##",
},
},
destType: "test-destinationType-1",
result: "stringempty",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"namespace": "##evrnvrv$vtr&^",
},
},
destType: "test-destinationType-1",
result: "evrnvrv_vtr",
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
},
destType: "test-destinationType-1",
result: "config_result",
setConfig: true,
},
{
source: backendconfig.SourceT{
Name: "test-source",
ID: "test-sourceID",
},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
ID: "test-destinationID",
},
destType: "test-destinationType-1",
result: "test-namespace",
setConfig: false,
},
{
source: backendconfig.SourceT{
Name: "test-source",
ID: "random-sourceID",
},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
ID: "random-destinationID",
},
destType: "test-destinationType-1",
result: "test_source",
setConfig: false,
},
{
source: backendconfig.SourceT{
Name: "test-source",
ID: "test-sourceID",
},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
ID: "test-destinationID",
},
destType: "test-destinationType-1",
result: "config_result_test_source",
setConfig: true,
},
}

for _, tc := range testcases {
tc := tc
t.Run("should return namespace", func(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pgResource, err := destination.SetupPostgres(pool, t)
require.NoError(t, err)

err = (&migrator.Migrator{
Handle: pgResource.DB,
MigrationsTable: "wh_schema_migrations",
}).Migrate("warehouse")

require.NoError(t, err)
store := memstats.New()
wh := HandleT{
destType: tc.destType,
stats: store,
dbHandle: pgResource.DB,
}
if tc.setConfig {
config.Set(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", warehouseutils.WHDestNameMap[tc.destType]), "config_result")
}

sqlStatement, err := os.ReadFile("testdata/sql/namespace_test.sql")
require.NoError(t, err)

_, err = pgResource.DB.Exec(string(sqlStatement))
require.NoError(t, err)

namespace := wh.getNamespace(tc.source, tc.destination)
require.Equal(t, tc.result, namespace)
config.Reset()
})
}
}

0 comments on commit 7b6fa35

Please sign in to comment.