Skip to content

Commit

Permalink
chore: warehouse backend config refactoring (#3602)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Aug 7, 2023
1 parent 2467eab commit e48c98e
Show file tree
Hide file tree
Showing 17 changed files with 1,542 additions and 1,166 deletions.
4 changes: 0 additions & 4 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ var (
disableDestinationWebhookURL string
webhook *whUtil.Recorder
disableDestinationWebhook *whUtil.Recorder
overrideArm64Check bool
writeKey string
workspaceID string
kafkaContainer *kafka.Resource
Expand Down Expand Up @@ -91,9 +90,6 @@ func TestMainFlow(t *testing.T) {
}

hold = os.Getenv("HOLD") == "true"
if os.Getenv("OVERRIDE_ARM64_CHECK") == "1" {
overrideArm64Check = true
}

var tearDownStart time.Time
defer func() {
Expand Down
1 change: 0 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func runAllInit() {
warehouse.Init2()
warehouse.Init3()
warehouse.Init4()
warehouse.Init5()
warehouse.Init6()
warehousearchiver.Init()
validations.Init()
Expand Down
28 changes: 17 additions & 11 deletions warehouse/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type WarehouseAdmin struct{}
type Admin struct {
bcManager *backendConfigManager
logger logger.Logger
}

type QueryInput struct {
DestID string
Expand All @@ -37,12 +40,15 @@ type ConfigurationTestOutput struct {
Error string
}

func Init5() {
admin.RegisterAdminHandler("Warehouse", &WarehouseAdmin{})
func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {
admin.RegisterAdminHandler("Warehouse", &Admin{
bcManager: bcManager,
logger: logger.Child("admin"),
})
}

// TriggerUpload sets uploads to start without delay
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
func (*Admin) TriggerUpload(off bool, reply *string) error {
startUploadAlways = !off
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
Expand All @@ -53,17 +59,17 @@ func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
}

// Query the underlying warehouse
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
func (a *Admin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return errors.New("please specify a valid and existing destination ID")
}

var warehouse model.Warehouse
// use the sourceID-destID connection if sourceID is not empty
if s.SourceID != "" {
w, ok := srcMap[s.SourceID]
Expand Down Expand Up @@ -92,19 +98,19 @@ func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) er
}
defer client.Close()

pkgLogger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
*reply, err = client.Query(s.SQLStatement)
return err
}

// ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
func (a *Admin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return fmt.Errorf("please specify a valid and existing destinationID: %s", s.DestID)
}
Expand All @@ -114,7 +120,7 @@ func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *Config
break
}

pkgLogger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)

destinationValidator := validations.NewDestinationValidator()
res := destinationValidator.Validate(context.TODO(), &warehouse.Destination)
Expand Down
27 changes: 11 additions & 16 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type UploadAPIT struct {
warehouseDBHandle *DB
log logger.Logger
connectionManager *controlplane.ConnectionManager
bcManager *backendConfigManager
isMultiWorkspace bool
}

Expand All @@ -119,7 +120,7 @@ const (
NoSuchSync = "No such sync exist"
)

func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
func InitWarehouseAPI(dbHandle *sql.DB, bcManager *backendConfigManager, log logger.Logger) error {
connectionToken, tokenType, isMultiWorkspace, err := deployment.GetConnectionToken()
if err != nil {
return err
Expand Down Expand Up @@ -161,6 +162,7 @@ func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
})
},
},
bcManager: bcManager,
}
return nil
}
Expand Down Expand Up @@ -528,27 +530,20 @@ func (uploadReq *UploadReq) validateReq() error {
}

func (uploadReq *UploadReq) authorizeSource(sourceID string) bool {
var authorizedSourceIDs []string
var ok bool
sourceIDsByWorkspaceLock.RLock()
defer sourceIDsByWorkspaceLock.RUnlock()
if authorizedSourceIDs, ok = sourceIDsByWorkspace[uploadReq.WorkspaceID]; !ok {
pkgLogger.Errorf(`Did not find sourceId's in workspace:%s. CurrentList:%v`, uploadReq.WorkspaceID, sourceIDsByWorkspace)
currentList := uploadReq.API.bcManager.SourceIDsByWorkspace()
authorizedSourceIDs, ok := currentList[uploadReq.WorkspaceID]
if !ok {
pkgLogger.Errorf(`Could not find sourceID in workspace %q: %v`, uploadReq.WorkspaceID, currentList)
return false
}
pkgLogger.Debugf(`Authorized sourceId's for workspace:%s - %v`, uploadReq.WorkspaceID, authorizedSourceIDs)

pkgLogger.Debugf(`Authorized sourceID for workspace %q: %v`, uploadReq.WorkspaceID, authorizedSourceIDs)
return slices.Contains(authorizedSourceIDs, sourceID)
}

func (uploadsReq *UploadsReq) authorizedSources() (sourceIDs []string) {
sourceIDsByWorkspaceLock.RLock()
defer sourceIDsByWorkspaceLock.RUnlock()
var ok bool
func (uploadsReq *UploadsReq) authorizedSources() []string {
pkgLogger.Debugf(`Getting authorizedSourceIDs for workspace:%s`, uploadsReq.WorkspaceID)
if sourceIDs, ok = sourceIDsByWorkspace[uploadsReq.WorkspaceID]; !ok {
sourceIDs = []string{}
}
return sourceIDs
return uploadsReq.API.bcManager.SourceIDsByWorkspace()[uploadsReq.WorkspaceID]
}

func (uploadsReq *UploadsReq) getUploadsFromDB(ctx context.Context, isMultiWorkspace bool, query string) ([]*proto.WHUploadResponse, int32, error) {
Expand Down
Loading

0 comments on commit e48c98e

Please sign in to comment.