Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: warehouse backend config refactoring #3602

Merged
merged 40 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b1b0c1b
chore: typo
fracasula Jul 7, 2023
fbc204f
chore: backend-config manager
fracasula Jul 7, 2023
ba0427d
fix: retry
fracasula Jul 7, 2023
1deb4ac
fix: api
fracasula Jul 7, 2023
58f4dfa
fix: admin
fracasula Jul 7, 2023
451348c
chore: updating tests
fracasula Jul 10, 2023
87ff175
chore: removing unused attributes
fracasula Jul 10, 2023
f47aaab
chore: bigquery test fix
fracasula Jul 10, 2023
301e17a
chore: linting
fracasula Jul 10, 2023
0bc99d5
chore: moving constructor
fracasula Jul 10, 2023
44aeb08
chore: refactor wh grpc test
fracasula Jul 18, 2023
7b95baa
chore: removing duplicated code in test
fracasula Jul 19, 2023
cdd1bc8
chore: cleaning up
fracasula Jul 19, 2023
b8d36a0
chore: ssh tunneling and logger
fracasula Jul 19, 2023
8dd4296
chore: removing unused client
fracasula Jul 19, 2023
ccf5134
chore: better routine management
fracasula Jul 19, 2023
a5e75a2
chore: rename variable
fracasula Jul 19, 2023
7e3d25f
chore: removing TODOs
fracasula Jul 19, 2023
2a30339
chore: more efficient workerIdentifier()
fracasula Jul 19, 2023
c70b229
chore: adding TODOs
fracasula Jul 19, 2023
d79319d
chore: adding info to backendConfigSubscriber
fracasula Jul 19, 2023
39cd70b
chore: optimizing concurrent code
fracasula Jul 20, 2023
0ad24f7
chore: using logger
fracasula Jul 21, 2023
ee7b1e1
chore: initialConfigFetched
fracasula Jul 21, 2023
5c756c7
chore: moving var
fracasula Jul 21, 2023
642dd40
chore: TODO
fracasula Jul 21, 2023
b24e638
chore: wrapped db handle
fracasula Jul 21, 2023
077abc3
chore: moving workerChannelMap initialization
fracasula Jul 21, 2023
8f9f564
chore: removing unused field
fracasula Jul 21, 2023
009d557
chore: removing unused variable
fracasula Jul 26, 2023
ca754ee
chore: go-kit v0.15.4
fracasula Jul 26, 2023
b4305f3
fix: alteredSchemaInAtLeastOneTable data race
fracasula Jul 26, 2023
fcd31db
chore: send all warehouses to subscriptions
fracasula Jul 26, 2023
53d0def
chore: formatting
fracasula Jul 26, 2023
99be513
chore: removing unnecessary logic
fracasula Jul 27, 2023
1f0f024
chore: reformat
fracasula Jul 27, 2023
3fc87a8
chore: added test case
achettyiitr Jul 31, 2023
fe6d88a
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Jul 31, 2023
e3a6c6e
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
1c8e8fe
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ var (
disableDestinationWebhookURL string
webhook *whUtil.Recorder
disableDestinationWebhook *whUtil.Recorder
overrideArm64Check bool
writeKey string
workspaceID string
kafkaContainer *kafka.Resource
Expand Down Expand Up @@ -91,9 +90,6 @@ func TestMainFlow(t *testing.T) {
}

hold = os.Getenv("HOLD") == "true"
if os.Getenv("OVERRIDE_ARM64_CHECK") == "1" {
overrideArm64Check = true
}

var tearDownStart time.Time
defer func() {
Expand Down
1 change: 0 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func runAllInit() {
warehouse.Init2()
warehouse.Init3()
warehouse.Init4()
warehouse.Init5()
warehouse.Init6()
warehousearchiver.Init()
validations.Init()
Expand Down
28 changes: 17 additions & 11 deletions warehouse/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type WarehouseAdmin struct{}
type Admin struct {
bcManager *backendConfigManager
logger logger.Logger
}

type QueryInput struct {
DestID string
Expand All @@ -37,12 +40,15 @@ type ConfigurationTestOutput struct {
Error string
}

func Init5() {
admin.RegisterAdminHandler("Warehouse", &WarehouseAdmin{})
func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {
admin.RegisterAdminHandler("Warehouse", &Admin{
bcManager: bcManager,
logger: logger.Child("admin"),
})
}

// TriggerUpload sets uploads to start without delay
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
func (*Admin) TriggerUpload(off bool, reply *string) error {
startUploadAlways = !off
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
Expand All @@ -53,17 +59,17 @@ func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error {
}

// Query the underlying warehouse
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
func (a *Admin) Query(s QueryInput, reply *warehouseutils.QueryResult) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return errors.New("please specify a valid and existing destination ID")
}

var warehouse model.Warehouse
// use the sourceID-destID connection if sourceID is not empty
if s.SourceID != "" {
w, ok := srcMap[s.SourceID]
Expand Down Expand Up @@ -92,19 +98,19 @@ func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) er
}
defer client.Close()

pkgLogger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Querying warehouse: %s:%s`, warehouse.Type, warehouse.Destination.ID)
*reply, err = client.Query(s.SQLStatement)
return err
}

// ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
func (a *Admin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error {
if strings.TrimSpace(s.DestID) == "" {
return errors.New("please specify the destination ID to query the warehouse")
}

var warehouse model.Warehouse
srcMap, ok := connectionsMap[s.DestID]
srcMap, ok := a.bcManager.ConnectionSourcesMap(s.DestID)
if !ok {
return fmt.Errorf("please specify a valid and existing destinationID: %s", s.DestID)
}
Expand All @@ -114,7 +120,7 @@ func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *Config
break
}

pkgLogger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)
a.logger.Infof(`[WH Admin]: Validating warehouse destination: %s:%s`, warehouse.Type, warehouse.Destination.ID)

destinationValidator := validations.NewDestinationValidator()
res := destinationValidator.Validate(context.TODO(), &warehouse.Destination)
Expand Down
27 changes: 11 additions & 16 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type UploadAPIT struct {
warehouseDBHandle *DB
log logger.Logger
connectionManager *controlplane.ConnectionManager
bcManager *backendConfigManager
isMultiWorkspace bool
}

Expand All @@ -119,7 +120,7 @@ const (
NoSuchSync = "No such sync exist"
)

func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
func InitWarehouseAPI(dbHandle *sql.DB, bcManager *backendConfigManager, log logger.Logger) error {
connectionToken, tokenType, isMultiWorkspace, err := deployment.GetConnectionToken()
if err != nil {
return err
Expand Down Expand Up @@ -161,6 +162,7 @@ func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
})
},
},
bcManager: bcManager,
}
return nil
}
Expand Down Expand Up @@ -528,27 +530,20 @@ func (uploadReq *UploadReq) validateReq() error {
}

func (uploadReq *UploadReq) authorizeSource(sourceID string) bool {
var authorizedSourceIDs []string
var ok bool
sourceIDsByWorkspaceLock.RLock()
defer sourceIDsByWorkspaceLock.RUnlock()
if authorizedSourceIDs, ok = sourceIDsByWorkspace[uploadReq.WorkspaceID]; !ok {
pkgLogger.Errorf(`Did not find sourceId's in workspace:%s. CurrentList:%v`, uploadReq.WorkspaceID, sourceIDsByWorkspace)
currentList := uploadReq.API.bcManager.SourceIDsByWorkspace()
authorizedSourceIDs, ok := currentList[uploadReq.WorkspaceID]
if !ok {
pkgLogger.Errorf(`Could not find sourceID in workspace %q: %v`, uploadReq.WorkspaceID, currentList)
return false
}
pkgLogger.Debugf(`Authorized sourceId's for workspace:%s - %v`, uploadReq.WorkspaceID, authorizedSourceIDs)

pkgLogger.Debugf(`Authorized sourceID for workspace %q: %v`, uploadReq.WorkspaceID, authorizedSourceIDs)
return slices.Contains(authorizedSourceIDs, sourceID)
}

func (uploadsReq *UploadsReq) authorizedSources() (sourceIDs []string) {
sourceIDsByWorkspaceLock.RLock()
defer sourceIDsByWorkspaceLock.RUnlock()
var ok bool
func (uploadsReq *UploadsReq) authorizedSources() []string {
pkgLogger.Debugf(`Getting authorizedSourceIDs for workspace:%s`, uploadsReq.WorkspaceID)
if sourceIDs, ok = sourceIDsByWorkspace[uploadsReq.WorkspaceID]; !ok {
sourceIDs = []string{}
}
return sourceIDs
return uploadsReq.API.bcManager.SourceIDsByWorkspace()[uploadsReq.WorkspaceID]
}

func (uploadsReq *UploadsReq) getUploadsFromDB(ctx context.Context, isMultiWorkspace bool, query string) ([]*proto.WHUploadResponse, int32, error) {
Expand Down
Loading
Loading