Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: warehouse backend config refactoring #3602

Merged
merged 40 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b1b0c1b
chore: typo
fracasula Jul 7, 2023
fbc204f
chore: backend-config manager
fracasula Jul 7, 2023
ba0427d
fix: retry
fracasula Jul 7, 2023
1deb4ac
fix: api
fracasula Jul 7, 2023
58f4dfa
fix: admin
fracasula Jul 7, 2023
451348c
chore: updating tests
fracasula Jul 10, 2023
87ff175
chore: removing unused attributes
fracasula Jul 10, 2023
f47aaab
chore: bigquery test fix
fracasula Jul 10, 2023
301e17a
chore: linting
fracasula Jul 10, 2023
0bc99d5
chore: moving constructor
fracasula Jul 10, 2023
44aeb08
chore: refactor wh grpc test
fracasula Jul 18, 2023
7b95baa
chore: removing duplicated code in test
fracasula Jul 19, 2023
cdd1bc8
chore: cleaning up
fracasula Jul 19, 2023
b8d36a0
chore: ssh tunneling and logger
fracasula Jul 19, 2023
8dd4296
chore: removing unused client
fracasula Jul 19, 2023
ccf5134
chore: better routine management
fracasula Jul 19, 2023
a5e75a2
chore: rename variable
fracasula Jul 19, 2023
7e3d25f
chore: removing TODOs
fracasula Jul 19, 2023
2a30339
chore: more efficient workerIdentifier()
fracasula Jul 19, 2023
c70b229
chore: adding TODOs
fracasula Jul 19, 2023
d79319d
chore: adding info to backendConfigSubscriber
fracasula Jul 19, 2023
39cd70b
chore: optimizing concurrent code
fracasula Jul 20, 2023
0ad24f7
chore: using logger
fracasula Jul 21, 2023
ee7b1e1
chore: initialConfigFetched
fracasula Jul 21, 2023
5c756c7
chore: moving var
fracasula Jul 21, 2023
642dd40
chore: TODO
fracasula Jul 21, 2023
b24e638
chore: wrapped db handle
fracasula Jul 21, 2023
077abc3
chore: moving workerChannelMap initialization
fracasula Jul 21, 2023
8f9f564
chore: removing unused field
fracasula Jul 21, 2023
009d557
chore: removing unused variable
fracasula Jul 26, 2023
ca754ee
chore: go-kit v0.15.4
fracasula Jul 26, 2023
b4305f3
fix: alteredSchemaInAtLeastOneTable data race
fracasula Jul 26, 2023
fcd31db
chore: send all warehouses to subscriptions
fracasula Jul 26, 2023
53d0def
chore: formatting
fracasula Jul 26, 2023
99be513
chore: removing unnecessary logic
fracasula Jul 27, 2023
1f0f024
chore: reformat
fracasula Jul 27, 2023
3fc87a8
chore: added test case
achettyiitr Jul 31, 2023
fe6d88a
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Jul 31, 2023
e3a6c6e
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
1c8e8fe
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func runAllInit() {
warehouse.Init2()
warehouse.Init3()
warehouse.Init4()
warehouse.Init5()
warehouse.Init6()
warehousearchiver.Init()
validations.Init()
Expand Down
26 changes: 16 additions & 10 deletions warehouse/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type WarehouseAdmin struct{}
type Admin struct {
bcManager *backendConfigManager
logger logger.Logger
}

type QueryInput struct {
DestID string
Expand All @@ -37,12 +40,15 @@ type ConfigurationTestOutput struct {
Error string
}

func Init5() {
admin.RegisterAdminHandler("Warehouse", &WarehouseAdmin{})
func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {
admin.RegisterAdminHandler("Warehouse", &Admin{
bcManager: bcManager,
logger: logger.Child("admin"),
})
}

// TriggerUpload sets uploads to start without delay
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
func (*Admin) TriggerUpload(off bool, reply *string) error {
startUploadAlways = !off
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
Expand All @@ -53,12 +59,12 @@ func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
}

// Query the underlying warehouse
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
func (a *Admin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

srcMap, ok := bcManager.ConnectionSourcesMap(s.DestID) // TODO remove global variable
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return errors.New("please specify a valid and existing destination ID")
}
Expand Down Expand Up @@ -92,19 +98,19 @@ func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) er
}
defer client.Close()

pkgLogger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
*reply, err = client.Query(s.SQLStatement)
return err
}

// ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
func (a *Admin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

var warehouse model.Warehouse
srcMap, ok := bcManager.ConnectionSourcesMap(s.DestID) // TODO remove global variable
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return fmt.Errorf("please specify a valid and existing destinationID: %s", s.DestID)
}
Expand All @@ -114,7 +120,7 @@ func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *Config
break
}

pkgLogger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)

destinationValidator := validations.NewDestinationValidator()
res := destinationValidator.Validate(context.TODO(), &warehouse.Destination)
Expand Down
10 changes: 6 additions & 4 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type UploadAPIT struct {
warehouseDBHandle *DB
log logger.Logger
connectionManager *controlplane.ConnectionManager
bcManager *backendConfigManager
isMultiWorkspace bool
}

Expand All @@ -119,7 +120,7 @@ const (
NoSuchSync = "No such sync exist"
)

func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
func InitWarehouseAPI(dbHandle *sql.DB, bcManager *backendConfigManager, log logger.Logger) error {
connectionToken, tokenType, isMultiWorkspace, err := deployment.GetConnectionToken()
if err != nil {
return err
Expand Down Expand Up @@ -161,6 +162,7 @@ func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
})
},
},
bcManager: bcManager,
}
return nil
}
Expand Down Expand Up @@ -528,8 +530,8 @@ func (uploadReq *UploadReq) validateReq() error {
}

func (uploadReq *UploadReq) authorizeSource(sourceID string) bool {
currentList := bcManager.SourceIDsByWorkspace()
authorizedSourceIDs, ok := currentList[uploadReq.WorkspaceID] // TODO remove global variable
currentList := uploadReq.API.bcManager.SourceIDsByWorkspace()
authorizedSourceIDs, ok := currentList[uploadReq.WorkspaceID]
if !ok {
pkgLogger.Errorf(`Could not find sourceID in workspace %q: %v`, uploadReq.WorkspaceID, currentList)
return false
Expand All @@ -541,7 +543,7 @@ func (uploadReq *UploadReq) authorizeSource(sourceID string) bool {

func (uploadsReq *UploadsReq) authorizedSources() []string {
pkgLogger.Debugf(`Getting authorizedSourceIDs for workspace:%s`, uploadsReq.WorkspaceID)
return bcManager.SourceIDsByWorkspace()[uploadsReq.WorkspaceID] // TODO remove global variable
return uploadsReq.API.bcManager.SourceIDsByWorkspace()[uploadsReq.WorkspaceID]
}

func (uploadsReq *UploadsReq) getUploadsFromDB(ctx context.Context, isMultiWorkspace bool, query string) ([]*proto.WHUploadResponse, int32, error) {
Expand Down
28 changes: 15 additions & 13 deletions warehouse/backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

// TODO: add tests
func newBackendConfigManager(
c *config.Config,
db *sqlquerywrapper.DB,
Expand Down Expand Up @@ -106,6 +105,12 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War
ch := make(chan []model.Warehouse, 10)
s.subscriptions = append(s.subscriptions, ch)

s.warehousesMu.Lock()
if len(s.warehouses) > 0 {
ch <- s.warehouses
}
s.warehousesMu.Unlock()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achettyiitr this version was better, now we're holding the lock for the duration of the whole subscribe. I think we should put this code back.

Copy link
Member

@achettyiitr achettyiitr Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason, we are adding this is because there is a race condition happening if we are not subscribers and we get backed config. The subscriber will not get anything until the next time we get the backend config changes
So, during the subscription also, we are sending the last copy of the data.


go func() {
<-ctx.Done()

Expand All @@ -126,26 +131,23 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
})

var (
warehouses []model.Warehouse
connectionFlags backendconfig.ConnectionFlags
sourceIDsByWorkspaceTemp = make(map[string][]string)
warehouses []model.Warehouse
connectionFlags backendconfig.ConnectionFlags
sourceIDsByWorkspace = make(map[string][]string)
)

for workspaceID, wConfig := range data {
// the last connection flags should be enough, since they are all the same in multi-workspace environments
connectionFlags = wConfig.ConnectionFlags
// map source IDs to workspace IDs
workspaceBySourceIDs := make(map[string]string)

for _, source := range wConfig.Sources {
workspaceBySourceIDs[source.ID] = workspaceID

if _, ok := sourceIDsByWorkspaceTemp[workspaceID]; !ok {
sourceIDsByWorkspaceTemp[workspaceID] = make([]string, 0, len(wConfig.Sources))
if _, ok := sourceIDsByWorkspace[workspaceID]; !ok {
sourceIDsByWorkspace[workspaceID] = make([]string, 0, len(wConfig.Sources))
}
sourceIDsByWorkspaceTemp[workspaceID] = append(sourceIDsByWorkspaceTemp[workspaceID], source.ID)
sourceIDsByWorkspace[workspaceID] = append(sourceIDsByWorkspace[workspaceID], source.ID)

for _, destination := range source.Destinations {
if !slices.Contains(whutils.WarehouseDestinations, destination.DestinationDefinition.Name) {
if _, ok := warehouseutils.WarehouseDestinationMap[destination.DestinationDefinition.Name]; !ok {
s.logger.Debugf("Not a warehouse destination, skipping %s", destination.DestinationDefinition.Name)
continue
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
s.warehousesMu.Unlock()

s.sourceIDsByWorkspaceMu.Lock()
s.sourceIDsByWorkspace = sourceIDsByWorkspaceTemp
s.sourceIDsByWorkspace = sourceIDsByWorkspace
s.sourceIDsByWorkspaceMu.Unlock()

s.subscriptionsMu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion warehouse/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (retryReq *RetryRequest) UploadsToRetry(ctx context.Context) (response Retr
}

func (retryReq *RetryRequest) getSourceIDs() []string {
return bcManager.SourceIDsByWorkspace()[retryReq.WorkspaceID] // TODO remove global variable
return retryReq.API.bcManager.SourceIDsByWorkspace()[retryReq.WorkspaceID]
}

func (retryReq *RetryRequest) clausesQuery(sourceIDs []string) []FilterClause {
Expand Down
1 change: 0 additions & 1 deletion warehouse/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func initWarehouse() {
Init2()
Init3()
Init4()
Init5()
}

var _ = Describe("Scheduling", func() {
Expand Down
12 changes: 8 additions & 4 deletions warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ var (
runningMode string
uploadStatusTrackFrequency time.Duration
uploadAllocatorSleep time.Duration
waitForConfig time.Duration
waitForWorkerSleep time.Duration
ShouldForceSetLowerVersion bool
maxParallelJobCreation int
Expand Down Expand Up @@ -181,7 +180,6 @@ func loadConfig() {
runningMode = config.GetString("Warehouse.runningMode", "")
config.RegisterDurationConfigVariable(30, &uploadStatusTrackFrequency, false, time.Minute, []string{"Warehouse.uploadStatusTrackFrequency", "Warehouse.uploadStatusTrackFrequencyInMin"}...)
config.RegisterDurationConfigVariable(5, &uploadAllocatorSleep, false, time.Second, []string{"Warehouse.uploadAllocatorSleep", "Warehouse.uploadAllocatorSleepInS"}...)
config.RegisterDurationConfigVariable(5, &waitForConfig, false, time.Second, []string{"Warehouse.waitForConfig", "Warehouse.waitForConfigInS"}...)
config.RegisterDurationConfigVariable(5, &waitForWorkerSleep, false, time.Second, []string{"Warehouse.waitForWorkerSleep", "Warehouse.waitForWorkerSleepInS"}...)
config.RegisterBoolConfigVariable(true, &ShouldForceSetLowerVersion, false, "SQLMigrator.forceSetLowerVersion")
config.RegisterIntConfigVariable(8, &maxParallelJobCreation, true, 1, "Warehouse.maxParallelJobCreation")
Expand Down Expand Up @@ -258,6 +256,10 @@ func (wh *HandleT) backendConfigSubscriber(ctx context.Context) {
for warehouses := range bcManager.Subscribe(ctx) {
wh.Logger.Info(`Received updated workspace config`)

warehouses = lo.Filter(warehouses, func(warehouse model.Warehouse, _ int) bool {
return warehouse.Destination.DestinationDefinition.Name == wh.destType
})
Comment on lines +258 to +260
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achettyiitr we could also do this to save ourselves the next for loop:

warehouses = lo.Filter(warehouses, func(warehouse model.Warehouse, _ int) bool {
	if warehouse.Destination.DestinationDefinition.Name != wh.destType {
		return false
	}

	if warehouseutils.IDResolutionEnabled() &&
		slices.Contains(warehouseutils.IdentityEnabledWarehouses, wh.destType) {
		wh.setupIdentityTables(ctx, warehouse)
		if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled {
			// non-blocking populate historic identities
			wh.populateHistoricIdentities(ctx, warehouse)
		}
	}

	return true
})


wh.configSubscriberLock.Lock()
wh.warehouses = warehouses
if wh.workspaceBySourceIDs == nil {
Expand Down Expand Up @@ -1453,11 +1455,13 @@ func Start(ctx context.Context, app app.App) error {
bcManager.Start(ctx)
})

RegisterAdmin(bcManager, pkgLogger)

runningMode := config.GetString("Warehouse.runningMode", "")
if runningMode == DegradedMode {
pkgLogger.Infof("WH: Running warehouse service in degraded mode...")
if isMaster() {
err := InitWarehouseAPI(dbHandle, pkgLogger.Child("upload_api"))
err := InitWarehouseAPI(dbHandle, bcManager, pkgLogger.Child("upload_api"))
if err != nil {
pkgLogger.Errorf("WH: Failed to start warehouse api: %v", err)
return err
Expand Down Expand Up @@ -1554,7 +1558,7 @@ func Start(ctx context.Context, app app.App) error {
return nil
}))

err := InitWarehouseAPI(dbHandle, pkgLogger.Child("upload_api"))
err := InitWarehouseAPI(dbHandle, bcManager, pkgLogger.Child("upload_api"))
if err != nil {
pkgLogger.Errorf("WH: Failed to start warehouse api: %v", err)
return err
Expand Down
1 change: 0 additions & 1 deletion warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func initWarehouse() {
Init2()
Init3()
Init4()
Init5()
validations.Init()
misc.Init()
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/warehousegrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func setupWarehouseGRPCTest(
}
resetBackendConfigManager()

require.NoError(t, InitWarehouseAPI(pgResource.DB, logger.NOP))
require.NoError(t, InitWarehouseAPI(pgResource.DB, bcManager, logger.NOP))

return pgResource, resetBackendConfigManager
}
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.