Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): glue partitions #2899

Merged
merged 16 commits into from
Feb 1, 2023
Merged

feat(warehouse): glue partitions #2899

merged 16 commits into from
Feb 1, 2023

Conversation

achettyiitr
Copy link
Member

@achettyiitr achettyiitr commented Jan 24, 2023

Description

Glue partitions

  • AWS Glue provides partitioning support. Once the load files are generated we are refreshing the partitions.
  • We are calling refresh partitions after the loading of the data is completed since we have a dependency on schema, table, and columns getting created.

Reference: #2476

Notion Ticket

https://www.notion.so/rudderstacks/AWS-Glue-partitioning-b80301553aed40958df49c3a9beeba40

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@codecov
Copy link

codecov bot commented Jan 24, 2023

Codecov Report

Base: 51.33% // Head: 51.73% // Increases project coverage by +0.40% 🎉

Coverage data is based on head (63fcc30) compared to base (d049f2b).
Patch coverage: 65.24% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2899      +/-   ##
==========================================
+ Coverage   51.33%   51.73%   +0.40%     
==========================================
  Files         321      321              
  Lines       50646    50791     +145     
==========================================
+ Hits        25997    26279     +282     
+ Misses      23079    22931     -148     
- Partials     1570     1581      +11     
Impacted Files Coverage Δ
warehouse/upload.go 21.48% <0.00%> (-0.44%) ⬇️
warehouse/utils/utils.go 70.07% <ø> (-0.90%) ⬇️
...e/integrations/datalake/schema-repository/local.go 89.28% <63.63%> (+89.28%) ⬆️
...se/integrations/datalake/schema-repository/glue.go 77.32% <74.07%> (+77.32%) ⬆️
...ns/datalake/schema-repository/schema_repository.go 77.27% <93.75%> (ø)
warehouse/internal/loadfiles/loadfiles.go 72.68% <95.23%> (+2.63%) ⬆️
jobsdb/jobsdb.go 73.95% <0.00%> (+0.12%) ⬆️
utils/misc/misc.go 49.15% <0.00%> (+0.67%) ⬆️
warehouse/schema.go 51.42% <0.00%> (+1.14%) ⬆️
services/rsources/handler.go 75.20% <0.00%> (+1.37%) ⬆️
... and 4 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@achettyiitr achettyiitr marked this pull request as ready for review January 24, 2023 22:28
if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.

Copy link
Member

@lvrach lvrach Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the err from FetchSchema then? We can re-introduce it once error is possible.

I see the schema needs to satisfy an interface.

One possible way to work around this:

func (ls *LocalSchemaRepository) localFetchSchema() warehouseutils.SchemaT {
      return ls.uploader.GetLocalSchema()
}

func (ls *LocalSchemaRepository) FetchSchema(_ warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error) {
   s := ls.localFetchSchema()
   return s, s, nil
}

func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
    schema:= ls.localFetchSchema(ls.warehouse)

if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.

if err != nil {
return err
}
schema, _, _ := ls.FetchSchema(ls.warehouse)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FetchSchema always returns nil, ignoring it.

@@ -482,6 +484,10 @@ func (job *UploadJobT) run() (err error) {
job.matchRowsInStagingAndLoadFiles()
job.recordLoadFileGenerationTimeStat(startLoadFileID, endLoadFileID)

if err = job.RefreshPartitions(startLoadFileID, endLoadFileID); err != nil {
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log the error? As an Warn possibly?

Comment on lines 2109 to 2116
// This is best done every 100 files, since it's a batch request for updates in Glue
partitionBatchSize := config.GetInt("Warehouse.refreshPartitionBatchSize", 99)
for i := 0; i < len(loadFiles); i += partitionBatchSize {
end := i + partitionBatchSize
if end > len(loadFiles) {
end = len(loadFiles)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to move the batching logic to the GlueSchemaRepository or GlueClient ?

It seems destination-specific logic.

Also why GetInt("Warehouse.refreshPartitionBatchSize", 99) is not a 100 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a limit set by the SDK for the batching to be 100. Since these changes were contributed from an open source user who kept it to 99.
5c8a8c9

uploadOutput, err := fm.Upload(context.TODO(), f, fmt.Sprintf("rudder-test-payload/s3-datalake/%s/dt=2006-01-02/", warehouseutils.RandHex()))
require.NoError(t, err)

err = g.RefreshPartitions(testTable, []warehouseutils.LoadFileT{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we checking the side effects of refresh partitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have added only the round-trip flow for Glue. Since this requires an external service call to Glue. Should we mock the API call and tests this?

fileBatches := make([][]warehouseutils.LoadFileT, 0, len(files)/batchSize+1)

for len(files) > 0 {
cut := int(math.Min(float64(len(files)), float64(batchSize)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general would avoid using float64 for anything that requires precision: https://jvns.ca/blog/2023/01/13/examples-of-floating-point-problems/#example-1-the-odometer-that-stopped

However, I can not think of an example where this would fail. So take this as a minor suggestion:

Suggested change
cut := int(math.Min(float64(len(files)), float64(batchSize)))
cut := batchSize
if len(files) > cut {
cut = len(files)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. @lvrach

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, I was expecting the math package to have func max(x, y int) int. Although there was one more package by Dave Cheney it's archived not. There is proposal to have a generic implementation for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants