Skip to content

Commit

Permalink
chore: added test case for many subscribers data race
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 1, 2023
1 parent fe6d88a commit e3a6c6e
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 275 deletions.
112 changes: 74 additions & 38 deletions warehouse/backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package warehouse
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/samber/lo"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"

"github.com/rudderlabs/rudder-server/warehouse/logfield"

"golang.org/x/exp/slices"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand All @@ -16,30 +20,26 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func newBackendConfigManager(
c *config.Config,
db *sqlquerywrapper.DB,
bc backendconfig.BackendConfig,
tenantManager *multitenant.Manager,
log logger.Logger,
) *backendConfigManager {
if c == nil {
c = config.Default
}
if bc == nil {
bc = backendconfig.DefaultBackendConfig
}
if log == nil {
log = logger.NOP
}
bcm := &backendConfigManager{
conf: c,
db: db,
schema: repo.NewWHSchemas(db),
backendConfig: bc,
tenantManager: tenantManager,
logger: log,
initialConfigFetched: make(chan struct{}),
connectionsMap: make(map[string]map[string]model.Warehouse),
Expand All @@ -61,7 +61,7 @@ type backendConfigManager struct {
conf *config.Config
db *sqlquerywrapper.DB
schema *repo.WHSchema
backendConfig backendconfig.BackendConfig
tenantManager *multitenant.Manager
internalControlPlaneClient cpclient.InternalControlPlane
logger logger.Logger

Expand All @@ -83,7 +83,7 @@ type backendConfigManager struct {
}

func (s *backendConfigManager) Start(ctx context.Context) {
ch := s.backendConfig.Subscribe(ctx, backendconfig.TopicBackendConfig)
ch := s.tenantManager.WatchConfig(ctx)
for {
select {
case <-ctx.Done():
Expand All @@ -92,7 +92,7 @@ func (s *backendConfigManager) Start(ctx context.Context) {
if !ok {
return
}
s.processData(ctx, data.Data.(map[string]backendconfig.ConfigT))
s.processData(ctx, data)
}
}
}
Expand All @@ -101,7 +101,6 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War
s.subscriptionsMu.Lock()
defer s.subscriptionsMu.Unlock()

idx := len(s.subscriptions)
ch := make(chan []model.Warehouse, 10)
s.subscriptions = append(s.subscriptions, ch)

Expand All @@ -119,7 +118,12 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War

close(ch)

s.subscriptions = append(s.subscriptions[:idx], s.subscriptions[idx+1:]...)
for i, item := range s.subscriptions {
if item == ch {
s.subscriptions = append(s.subscriptions[:i], s.subscriptions[i+1:]...)
return
}
}
}()

return ch
Expand All @@ -134,6 +138,7 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
warehouses []model.Warehouse
connectionFlags backendconfig.ConnectionFlags
sourceIDsByWorkspace = make(map[string][]string)
connectionsMap = make(map[string]map[string]model.Warehouse)
)

for workspaceID, wConfig := range data {
Expand All @@ -147,17 +152,11 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
sourceIDsByWorkspace[workspaceID] = append(sourceIDsByWorkspace[workspaceID], source.ID)

for _, destination := range source.Destinations {
if _, ok := warehouseutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok {
if _, ok := whutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok {
s.logger.Debugf("Not a warehouse destination, skipping %s", destination.DestinationDefinition.Name)
continue
}

wh := &HandleT{
dbHandle: s.db,
whSchemaRepo: s.schema,
conf: s.conf,
destType: destination.DestinationDefinition.Name,
}
if s.internalControlPlaneClient != nil {
destination = s.attachSSHTunnellingInfo(ctx, destination)
}
Expand All @@ -166,41 +165,35 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
Source: source,
WorkspaceID: workspaceID,
Destination: destination,
Type: wh.destType,
Namespace: wh.getNamespace(ctx, source, destination),
Identifier: whutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
Type: destination.DestinationDefinition.Name,
Namespace: s.namespace(ctx, source, destination),
Identifier: whutils.GetWarehouseIdentifier(destination.DestinationDefinition.Name, source.ID, destination.ID),
}

warehouses = append(warehouses, warehouse)

s.connectionsMapMu.Lock()
if _, ok := s.connectionsMap[destination.ID]; !ok {
s.connectionsMap[destination.ID] = make(map[string]model.Warehouse)
if _, ok := connectionsMap[destination.ID]; !ok {
connectionsMap[destination.ID] = make(map[string]model.Warehouse)
}
s.connectionsMap[destination.ID][source.ID] = warehouse
s.connectionsMapMu.Unlock()
connectionsMap[destination.ID][source.ID] = warehouse

if destination.Config["sslMode"] == "verify-ca" {
if err := whutils.WriteSSLKeys(destination); err.IsError() {
s.logger.Error(err.Error())
persistSSLFileErrorStat(
workspaceID, wh.destType, destination.Name, destination.ID,
workspaceID, destination.DestinationDefinition.Name, destination.Name, destination.ID,
source.Name, source.ID, err.GetErrTag(),
)
}
}

if whutils.IDResolutionEnabled() && slices.Contains(whutils.IdentityEnabledWarehouses, warehouse.Type) {
wh.setupIdentityTables(ctx, warehouse)
if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled {
// non-blocking populate historic identities
wh.populateHistoricIdentities(ctx, warehouse)
}
}
}
}
}

s.connectionsMapMu.Lock()
s.connectionsMap = connectionsMap
s.connectionsMapMu.Unlock()

s.warehousesMu.Lock()
s.warehouses = warehouses // TODO how is this used? because we are duplicating data
s.warehousesMu.Unlock()
Expand All @@ -222,6 +215,49 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
}
}

// namespace gives the namespace for the warehouse in the following order
// 1. user set name from destinationConfig
// 2. from existing record in wh_schemas with same source + dest combo
// 3. convert source name
func (s *backendConfigManager) namespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string {
destType := destination.DestinationDefinition.Name
destConfig := destination.Config

if destType == whutils.CLICKHOUSE {
if database, ok := destConfig["database"].(string); ok {
return database
}
return "rudder"
}

if destConfig["namespace"] != nil {
namespace, _ := destConfig["namespace"].(string)
if len(strings.TrimSpace(namespace)) > 0 {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, namespace))
}
}

namespacePrefix := s.conf.GetString(fmt.Sprintf("Warehouse.%s.customDatasetPrefix", whutils.WHDestNameMap[destType]), "")
if namespacePrefix != "" {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, fmt.Sprintf(`%s_%s`, namespacePrefix, source.Name)))
}

namespace, err := s.schema.GetNamespace(ctx, source.ID, destination.ID)
if err != nil {
s.logger.Errorw("getting namespace",
logfield.SourceID, source.ID,
logfield.DestinationID, destination.ID,
logfield.DestinationType, destination.DestinationDefinition.Name,
logfield.WorkspaceID, destination.WorkspaceID,
)
return ""
}
if namespace == "" {
return whutils.ToProviderCase(destType, whutils.ToSafeNamespace(destType, source.Name))
}
return namespace
}

func (s *backendConfigManager) IsInitialized() bool {
select {
case <-s.initialConfigFetched:
Expand Down Expand Up @@ -275,7 +311,7 @@ func (s *backendConfigManager) attachSSHTunnellingInfo(
upstream backendconfig.DestinationT,
) backendconfig.DestinationT {
// at destination level, do we have tunnelling enabled.
if tunnelEnabled := warehouseutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled {
if tunnelEnabled := whutils.ReadAsBool("useSSH", upstream.Config); !tunnelEnabled {
return upstream
}

Expand Down
Loading

0 comments on commit e3a6c6e

Please sign in to comment.