Skip to content

Commit

Permalink
chore: added test case for many subscribers data race
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 1, 2023
1 parent fe6d88a commit 6301bf8
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 258 deletions.
97 changes: 67 additions & 30 deletions warehouse/backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package warehouse
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/samber/lo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"

"golang.org/x/exp/slices"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand All @@ -16,7 +18,6 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -101,7 +102,6 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War
s.subscriptionsMu.Lock()
defer s.subscriptionsMu.Unlock()

idx := len(s.subscriptions)
ch := make(chan []model.Warehouse, 10)
s.subscriptions = append(s.subscriptions, ch)

Expand All @@ -119,7 +119,12 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War

close(ch)

s.subscriptions = append(s.subscriptions[:idx], s.subscriptions[idx+1:]...)
for i, item := range s.subscriptions {
if item == ch {
s.subscriptions = append(s.subscriptions[:i], s.subscriptions[i+1:]...)
return
}
}
}()

return ch
Expand All @@ -134,6 +139,7 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
warehouses []model.Warehouse
connectionFlags backendconfig.ConnectionFlags
sourceIDsByWorkspace = make(map[string][]string)
connectionsMap = make(map[string]map[string]model.Warehouse)
)

for workspaceID, wConfig := range data {
Expand All @@ -147,17 +153,11 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
sourceIDsByWorkspace[workspaceID] = append(sourceIDsByWorkspace[workspaceID], source.ID)

for _, destination := range source.Destinations {
if _, ok := warehouseutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok {
if _, ok := whutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok {
s.logger.Debugf("Not a warehouse destination, skipping %s", destination.DestinationDefinition.Name)
continue
}

wh := &HandleT{
dbHandle: s.db,
whSchemaRepo: s.schema,
conf: s.conf,
destType: destination.DestinationDefinition.Name,
}
if s.internalControlPlaneClient != nil {
destination = s.attachSSHTunnellingInfo(ctx, destination)
}
Expand All @@ -166,41 +166,35 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
Source: source,
WorkspaceID: workspaceID,
Destination: destination,
Type: wh.destType,
Namespace: wh.getNamespace(ctx, source, destination),
Identifier: whutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
Type: destination.DestinationDefinition.Name,
Namespace: s.namespace(ctx, source, destination),
Identifier: whutils.GetWarehouseIdentifier(destination.DestinationDefinition.Name, source.ID, destination.ID),
}

warehouses = append(warehouses, warehouse)

s.connectionsMapMu.Lock()
if _, ok := s.connectionsMap[destination.ID]; !ok {
s.connectionsMap[destination.ID] = make(map[string]model.Warehouse)
if _, ok := connectionsMap[destination.ID]; !ok {
connectionsMap[destination.ID] = make(map[string]model.Warehouse)
}
s.connectionsMap[destination.ID][source.ID] = warehouse
s.connectionsMapMu.Unlock()
connectionsMap[destination.ID][source.ID] = warehouse

if destination.Config["sslMode"] == "verify-ca" {
if err := whutils.WriteSSLKeys(destination); err.IsError() {
s.logger.Error(err.Error())
persistSSLFileErrorStat(
workspaceID, wh.destType, destination.Name, destination.ID,
workspaceID, destination.DestinationDefinition.Name, destination.Name, destination.ID,
source.Name, source.ID, err.GetErrTag(),
)
}

Check warning on line 188 in warehouse/backend_config.go

View check run for this annotation

Codecov / codecov/patch

warehouse/backend_config.go#L182-L188

Added lines #L182 - L188 were not covered by tests
}

if whutils.IDResolutionEnabled() && slices.Contains(whutils.IdentityEnabledWarehouses, warehouse.Type) {
wh.setupIdentityTables(ctx, warehouse)
if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled {
// non-blocking populate historic identities
wh.populateHistoricIdentities(ctx, warehouse)
}
}
}
}
}

s.connectionsMapMu.Lock()
s.connectionsMap = connectionsMap
s.connectionsMapMu.Unlock()

s.warehousesMu.Lock()
s.warehouses = warehouses // TODO how is this used? because we are duplicating data
s.warehousesMu.Unlock()
Expand All @@ -222,6 +216,49 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
}
}

// namespace gives the namespace for the warehouse in the following order
// 1. user set name from destinationConfig
// 2. from existing record in wh_schemas with same source + dest combo
// 3. convert source name
func (s *backendConfigManager) namespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string {
destType := destination.DestinationDefinition.Name
destConfig := destination.Config

if destType == whutils.CLICKHOUSE {
if database, ok := destConfig["database"].(string); ok {
return database
}
return "rudder"
}

if destConfig["namespace"] != nil {
namespace, _ := destConfig["namespace"].(string)
if len(strings.TrimSpace(namespace)) > 0 {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, namespace))
}
}

namespacePrefix := s.conf.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", whutils.WHDestNameMap[destType]), "")
if namespacePrefix != "" {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name)))
}

namespace, err := s.schema.GetNamespace(ctx, source.ID, destination.ID)
if err != nil {
s.logger.Errorw("getting namespace",
logfield.SourceID, source.ID,
logfield.DestinationID, destination.ID,
logfield.DestinationType, destination.DestinationDefinition.Name,
logfield.WorkspaceID, destination.WorkspaceID,
)
return ""
}

Check warning on line 255 in warehouse/backend_config.go

View check run for this annotation

Codecov / codecov/patch

warehouse/backend_config.go#L248-L255

Added lines #L248 - L255 were not covered by tests
if namespace == "" {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, source.Name))
}
return namespace
}

func (s *backendConfigManager) IsInitialized() bool {
select {
case <-s.initialConfigFetched:
Expand Down Expand Up @@ -275,7 +312,7 @@ func (s *backendConfigManager) attachSSHTunnellingInfo(
upstream backendconfig.DestinationT,
) backendconfig.DestinationT {
// at destination level, do we have tunnelling enabled.
if tunnelEnabled := warehouseutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled {
if tunnelEnabled := whutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled {
return upstream
}

Expand Down
Loading

0 comments on commit 6301bf8

Please sign in to comment.