-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: move warehouse handle to router #3687
chore: move warehouse handle to router #3687
Conversation
@@ -152,16 +149,16 @@ func (wh *HandleT) hasLocalIdentityData(warehouse model.Warehouse) (exists bool) | |||
`, | |||
warehouseutils.IdentityMergeRulesTableName(warehouse), | |||
) | |||
err := wh.dbHandle.QueryRow(sqlStatement).Scan(&exists) | |||
err := r.dbHandle.QueryRow(sqlStatement).Scan(&exists) | |||
if err != nil { | |||
// TODO: Handle this | |||
panic(fmt.Errorf("Query: %s\nfailed with Error : %w", sqlStatement, err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my information, how come we panic
here? Can't we handle it?
P.S.: the naked return feels harder to understand in this case, I suggest we use normal returns if you're OK with it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A follow up task can also be created to handle this panic
and possibly other ones too, but first it'd be nice to understand the scope of this.
@@ -72,121 +54,3 @@ func getMockStats(g GinkgoTInterface) (*mock_stats.MockStats, *mock_stats.MockMe | |||
mockMeasurement := mock_stats.NewMockMeasurement(ctrl) | |||
return mockStats, mockMeasurement | |||
} | |||
|
|||
func TestUploadJob_ProcessingStats(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to Router Processor Stats test.
@@ -1,136 +0,0 @@ | |||
package warehouse_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to router_scheduling_test.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing, posting a few questions in the meantime.
warehouse/router_identities.go
Outdated
func (wh *HandleT) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) { | ||
whManager, err := manager.New(wh.destType, wh.conf, wh.Logger, wh.stats) | ||
func (r *Router) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) { | ||
whManager, err := manager.New(r.destType, r.conf, r.logger, r.stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might not need to create a new instance each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in other places too so it might make sense to create it only once (see line 399).
func (wh *HandleT) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) { | ||
whManager, err := manager.New(wh.destType, wh.conf, wh.Logger, wh.stats) | ||
func (r *Router) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) { | ||
whManager, err := manager.New(r.destType, r.conf, r.logger, r.stats) | ||
if err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return an error here?
warehouse/router_identities.go
Outdated
@@ -173,10 +170,10 @@ func (wh *HandleT) hasWarehouseData(ctx context.Context, warehouse model.Warehou | |||
return !empty, nil | |||
} | |||
|
|||
func (wh *HandleT) setupIdentityTables(ctx context.Context, warehouse model.Warehouse) { | |||
func (r *Router) setupIdentityTables(ctx context.Context, warehouse model.Warehouse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make sense to create the table and its indexes in the same transaction instead of panicking each time with the risk of leaving the database in an "incomplete" state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just asking btw, not saying we have to address this in this PR but let's discuss if needed.
pkgLogger.Infof("[WH]: Did not find local identity tables..") | ||
pkgLogger.Infof("[WH]: Generating identity tables based on data in warehouse %s:%s", wh.destType, warehouse.Destination.ID) | ||
upload = wh.initPrePopulateDestIdentitiesUpload(warehouse) | ||
r.logger.Infof("[WH]: Did not find local identity tables..") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r.logger.Infof("[WH]: Did not find local identity tables..") |
@@ -441,15 +438,15 @@ func (wh *HandleT) populateHistoricIdentities(ctx context.Context, warehouse mod | |||
|
|||
err = job.schemaHandle.fetchSchemaFromWarehouse(ctx, whManager) | |||
if err != nil { | |||
pkgLogger.Errorf(`[WH]: Failed fetching schema from warehouse: %v`, err) | |||
r.logger.Errorf(`[WH]: Failed fetching schema from warehouse: %v`, err) | |||
job.setUploadError(err, model.Aborted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we are ignoring this error? We're doing the same with other calls to setUploadError
or setUploadStatus
.
f0cde37
to
0fbd947
Compare
0fbd947
to
53a1f6d
Compare
…ments' into chore.whBackendConfig+warehouse-handle-to-router
d84af52
to
f6cb961
Compare
@fracasula Added a task for addressing the review comments for router_identifies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have noticed we are using fmt.Sprintf
for providing argument into to the SQL query, we should use prepared statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a separate task for the prepared statements.
Description
identifies.go
andscheduling.go
into theirrouter_identifies.go
androuter_scheduling.go
router scope.Notion Ticket
https://www.notion.so/rudderstacks/Move-warehouseHandleT-to-rotuer-354c3e45fc674c3e988d1a79d244a5ff?pvs=4
Security