-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: warehouse backend config refactoring #3602
Conversation
696a1d7
to
5a0283b
Compare
d798c8d
to
7bd3c22
Compare
6301bf8
to
5bbf8d8
Compare
5bbf8d8
to
e3a6c6e
Compare
…re.whBackendConfig
62dee81
to
1c8e8fe
Compare
var connectionFlags backendconfig.ConnectionFlags | ||
for _, wConfig := range config { | ||
connectionFlags = wConfig.ConnectionFlags // the last connection flags should be enough, since they are all the same in multi-workspace environments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are already doing this in bcManager, no need do it here.
if val, ok := connectionFlags.Services["warehouse"]; ok { | ||
if UploadAPI.connectionManager != nil { | ||
UploadAPI.connectionManager.Apply(connectionFlags.URL, val) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are already doing this in bcManager, no need do it here.
@@ -193,173 +190,3 @@ func TestUploadJob_ProcessingStats(t *testing.T) { | |||
}) | |||
} | |||
} | |||
|
|||
func Test_GetNamespace(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to bcManager.
bcm.internalControlPlaneClient = cpclient.NewInternalClientWithCache( | ||
backendconfig.GetConfigBackendURL(), | ||
c.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we keep duplicating the variable name and its default? Isn't GetConfigBackendURL
good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid calling global function for backendconfig was difficult to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, because you replaced the bc with the tenant manager. Let's keep it this way for now then 👍
case data, ok := <-ch: | ||
if !ok { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr nice catch 👍
|
||
namespace, err := s.schema.GetNamespace(ctx, source.ID, destination.ID) | ||
if err != nil { | ||
s.logger.Errorw("getting namespace", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr we don't have enough context here, right? We should at least log the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let me make the changes in a separate PR.
for _, tc := range testcases { | ||
tc := tc | ||
|
||
t.Run("should return namespace", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr How come the test name is always the same? Shall we include a name for the test in the test case itself? Something that would describe the type of test in order to make it self-explanatory would be nice.
@@ -1447,12 +1405,23 @@ func Start(ctx context.Context, app app.App) error { | |||
} | |||
}() | |||
|
|||
g, ctx := errgroup.WithContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr it feels like we don't need a group here since none of the two routines are returning an error. also, we're overriding the context with the group one which could cause confusion in the code after the group that might be using the context. wdyt?
warehouse/backend_config.go
Outdated
s.warehousesMu.Lock() | ||
if len(s.warehouses) > 0 { | ||
ch <- s.warehouses | ||
} | ||
s.warehousesMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr this version was better, now we're holding the lock for the duration of the whole subscribe. I think we should put this code back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason, we are adding this is because there is a race condition happening if we are not subscribers and we get backed config. The subscriber will not get anything until the next time we get the backend config changes
So, during the subscription also, we are sending the last copy of the data.
warehouses = lo.Filter(warehouses, func(warehouse model.Warehouse, _ int) bool { | ||
return warehouse.Destination.DestinationDefinition.Name == wh.destType | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achettyiitr we could also do this to save ourselves the next for
loop:
warehouses = lo.Filter(warehouses, func(warehouse model.Warehouse, _ int) bool {
if warehouse.Destination.DestinationDefinition.Name != wh.destType {
return false
}
if warehouseutils.IDResolutionEnabled() &&
slices.Contains(warehouseutils.IdentityEnabledWarehouses, wh.destType) {
wh.setupIdentityTables(ctx, warehouse)
if shouldPopulateHistoricIdentities && warehouse.Destination.Enabled {
// non-blocking populate historic identities
wh.populateHistoricIdentities(ctx, warehouse)
}
}
return true
})
Description
Small refactoring for the backend config in the warehouse.
Changes:
Setup()
ifisMaster()
TODOs
write tests for new backend config-race
follow up on newTODO
comments added in the codeNotion Ticket
< Notion Link >
Security