From 141d109a5b4e8f4eda51c20b2c3a867ba9348c7f Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 8 Aug 2023 18:18:10 +0530 Subject: [PATCH] chore: addressing review coments from #3602 (#3713) --- warehouse/backend_config.go | 5 +++-- warehouse/backend_config_test.go | 25 +++++++++++-------------- warehouse/warehouse.go | 25 ++++++++++++------------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/warehouse/backend_config.go b/warehouse/backend_config.go index 015b3b5b41..96a7e2cc08 100644 --- a/warehouse/backend_config.go +++ b/warehouse/backend_config.go @@ -100,15 +100,15 @@ func (s *backendConfigManager) Start(ctx context.Context) { func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.Warehouse { s.subscriptionsMu.Lock() defer s.subscriptionsMu.Unlock() - s.warehousesMu.Lock() - defer s.warehousesMu.Unlock() ch := make(chan []model.Warehouse, 10) s.subscriptions = append(s.subscriptions, ch) + s.warehousesMu.Lock() if len(s.warehouses) > 0 { ch <- s.warehouses } + s.warehousesMu.Unlock() go func() { <-ctx.Done() @@ -249,6 +249,7 @@ func (s *backendConfigManager) namespace(ctx context.Context, source backendconf logfield.DestinationID, destination.ID, logfield.DestinationType, destination.DestinationDefinition.Name, logfield.WorkspaceID, destination.WorkspaceID, + logfield.Error, err.Error(), ) return "" } diff --git a/warehouse/backend_config_test.go b/warehouse/backend_config_test.go index 2334208bc7..52a8d7f1a2 100644 --- a/warehouse/backend_config_test.go +++ b/warehouse/backend_config_test.go @@ -288,6 +288,7 @@ func TestBackendConfigManager(t *testing.T) { func TestBackendConfigManager_Namespace(t *testing.T) { testcases := []struct { + name string config map[string]interface{} source backendconfig.SourceT destination backendconfig.DestinationT @@ -295,6 +296,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig bool }{ { + name: "clickhouse with database configured in config", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{ @@ -308,6 +310,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "clickhouse without database configured in config", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{}, @@ -319,19 +322,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { - source: backendconfig.SourceT{}, - destination: backendconfig.DestinationT{ - Config: map[string]interface{}{ - "namespace": "test_namespace", - }, - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: "test-destinationType-1", - }, - }, - expectedNamespace: "test_namespace", - setConfig: false, - }, - { + name: "namespace only contains letters", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{ @@ -345,6 +336,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "namespace only contains special characters", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{ @@ -358,6 +350,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "namespace contains special characters and letters", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{ @@ -371,6 +364,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "empty namespace but config is set", source: backendconfig.SourceT{}, destination: backendconfig.DestinationT{ Config: map[string]interface{}{}, @@ -382,6 +376,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: true, }, { + name: "empty namespace with picking from cache", source: backendconfig.SourceT{ Name: "test-source", ID: "test-sourceID", @@ -397,6 +392,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "destination config without namespace configured and custom dataset prefix is not configured", source: backendconfig.SourceT{ Name: "test-source", ID: "random-sourceID", @@ -412,6 +408,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { setConfig: false, }, { + name: "destination config without namespace configured and custom dataset prefix configured", source: backendconfig.SourceT{ Name: "test-source", ID: "test-sourceID", @@ -431,7 +428,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) { for _, tc := range testcases { tc := tc - t.Run("should return namespace", func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { pool, err := dockertest.NewPool("") require.NoError(t, err) diff --git a/warehouse/warehouse.go b/warehouse/warehouse.go index 237eb89533..4d6c24a42a 100644 --- a/warehouse/warehouse.go +++ b/warehouse/warehouse.go @@ -1405,13 +1405,13 @@ func Start(ctx context.Context, app app.App) error { } }() - g, ctx := errgroup.WithContext(ctx) + g, gCtx := errgroup.WithContext(ctx) tenantManager = &multitenant.Manager{ BackendConfig: backendconfig.DefaultBackendConfig, } g.Go(func() error { - tenantManager.Run(ctx) + tenantManager.Run(gCtx) return nil }) @@ -1420,7 +1420,7 @@ func Start(ctx context.Context, app app.App) error { pkgLogger.Child("wh_bc_manager"), ) g.Go(func() error { - bcManager.Start(ctx) + bcManager.Start(gCtx) return nil }) @@ -1451,7 +1451,7 @@ func Start(ctx context.Context, app app.App) error { reporting := application.Features().Reporting.Setup(backendconfig.DefaultBackendConfig) g.Go(misc.WithBugsnagForWarehouse(func() error { - reporting.AddClient(ctx, types.Config{ConnInfo: psqlInfo, ClientName: types.WarehouseReportingClient}) + reporting.AddClient(gCtx, types.Config{ConnInfo: psqlInfo, ClientName: types.WarehouseReportingClient}) return nil })) } @@ -1459,14 +1459,14 @@ func Start(ctx context.Context, app app.App) error { if isStandAlone() && isMaster() { // Report warehouse features g.Go(func() error { - backendconfig.DefaultBackendConfig.WaitForConfig(ctx) + backendconfig.DefaultBackendConfig.WaitForConfig(gCtx) c := controlplane.NewClient( backendconfig.GetConfigBackendURL(), backendconfig.DefaultBackendConfig.Identity(), ) - err := c.SendFeatures(ctx, info.WarehouseComponent.Name, info.WarehouseComponent.Features) + err := c.SendFeatures(gCtx, info.WarehouseComponent.Name, info.WarehouseComponent.Features) if err != nil { pkgLogger.Errorf("error sending warehouse features: %v", err) } @@ -1479,7 +1479,7 @@ func Start(ctx context.Context, app app.App) error { if isSlave() { pkgLogger.Infof("WH: Starting warehouse slave...") g.Go(misc.WithBugsnagForWarehouse(func() error { - return setupSlave(ctx) + return setupSlave(gCtx) })) } @@ -1497,12 +1497,11 @@ func Start(ctx context.Context, app app.App) error { ) g.Go(misc.WithBugsnagForWarehouse(func() error { - return notifier.ClearJobs(ctx) + return notifier.ClearJobs(gCtx) })) g.Go(misc.WithBugsnagForWarehouse(func() error { - err := monitorDestRouters(ctx) - return err + return monitorDestRouters(gCtx) })) archiver := &archive.Archiver{ @@ -1513,7 +1512,7 @@ func Start(ctx context.Context, app app.App) error { Multitenant: tenantManager, } g.Go(misc.WithBugsnagForWarehouse(func() error { - archive.CronArchiver(ctx, archiver) + archive.CronArchiver(gCtx, archiver) return nil })) @@ -1522,7 +1521,7 @@ func Start(ctx context.Context, app app.App) error { pkgLogger.Errorf("WH: Failed to start warehouse api: %v", err) return err } - asyncWh = jobs.InitWarehouseJobsAPI(ctx, dbHandle, ¬ifier) + asyncWh = jobs.InitWarehouseJobsAPI(gCtx, dbHandle, ¬ifier) jobs.WithConfig(asyncWh, config.Default) g.Go(misc.WithBugsnagForWarehouse(func() error { @@ -1531,7 +1530,7 @@ func Start(ctx context.Context, app app.App) error { } g.Go(func() error { - return startWebHandler(ctx) + return startWebHandler(gCtx) }) return g.Wait()