Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: load file upload #3552

Merged
merged 8 commits into from
Jul 5, 2023
Merged

chore: load file upload #3552

merged 8 commits into from
Jul 5, 2023

Conversation

achettyiitr
Copy link
Member

@achettyiitr achettyiitr commented Jun 26, 2023

Description

  • Remove globals: numLoadFileUploadWorkers, slaveUploadTimeout, pkgLogger.
  • Minor refactoring for uploadLoadFiles.

Notion Ticket

https://www.notion.so/rudderstacks/Metrics-with-same-name-and-label-values-16f1ee308ee545848bc12ad63f990f96?pvs=4

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@codecov
Copy link

codecov bot commented Jun 26, 2023

Codecov Report

Patch coverage: 74.86% and project coverage change: +0.39 🎉

Comparison is base (bed100d) 67.53% compared to head (af56f8b) 67.93%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3552      +/-   ##
==========================================
+ Coverage   67.53%   67.93%   +0.39%     
==========================================
  Files         321      318       -3     
  Lines       52059    50284    -1775     
==========================================
- Hits        35160    34158    -1002     
+ Misses      14564    13891     -673     
+ Partials     2335     2235     -100     
Impacted Files Coverage Δ
runner/runner.go 70.75% <ø> (+0.16%) ⬆️
utils/misc/misc.go 61.36% <ø> (+8.71%) ⬆️
warehouse/client/client.go 60.27% <ø> (+11.38%) ⬆️
warehouse/encoding/parquetwriter.go 80.00% <ø> (ø)
warehouse/integrations/bigquery/bigquery.go 57.49% <0.00%> (ø)
...ns/datalake/schema-repository/schema_repository.go 66.66% <ø> (ø)
...use/integrations/middleware/sqlquerywrapper/sql.go 92.42% <ø> (ø)
warehouse/integrations/mssql/mssql.go 63.87% <0.00%> (ø)
warehouse/integrations/redshift/redshift.go 66.69% <0.00%> (ø)
warehouse/integrations/snowflake/snowflake.go 64.77% <0.00%> (ø)
... and 18 more

... and 9 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@achettyiitr achettyiitr force-pushed the chore.slave-upload-load-files branch from df04eb4 to fa84107 Compare June 26, 2023 17:32
@achettyiitr achettyiitr force-pushed the chore.slave-upload-load-files branch from fa84107 to 494b5bc Compare June 26, 2023 17:42
Copy link
Collaborator

@fracasula fracasula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you already know if this fixes the "metrics with same name and label values" issue or it's just an attempt?

warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
since: time.Since,
numLoadFileUploadWorkers: config.GetInt("Warehouse.numLoadFileUploadWorkers", 8),
slaveUploadTimeout: config.GetDuration("Warehouse.slaveUploadTimeout", 10, config.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute)),
logger: pkgLogger,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[future-task] We're keeping this for another day then? 🙂

We should consider having an "app handler" like we're doing for the other apps (e.g. gateway) and have the Start function in warehouse.go function be a method of a new app:

type warehouseApp struct {
    // ...
    app app.App
    logger logger.Logger
}

func (a *warehouseApp) Start(ctx context.Context) error {
    // ...

Wdyt? This should help remove the last global variables.

It should also help us remove the direct invocations of the config package like we're doing here in the processStagingFile function. We would be able to have a *config.Config and use that one instead of using the global config.GetInt etc... each time.

Copy link
Member Author

@achettyiitr achettyiitr Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I have already a PR in my local for this. I will share it shortly.

warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
@achettyiitr
Copy link
Member Author

achettyiitr commented Jun 27, 2023

Do you already know if this fixes the "metrics with same name and label values" issue or it's just an attempt?

It's just an attempt. I tried to reproduce the error using the Unit test but was not able to. It's an open issue:

prometheus/client_golang#1269
prometheus/client_golang#1231

warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
Comment on lines 244 to 248
defer func() {
if totalUploadTime > 0 {
jr.timerStat("load_file_upload_time").SendTiming(totalUploadTime)
}
}()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is the right approach? Isn't this metric meant to track the time it takes for each upload of a load file to take place? This function is called uploadLoadFiles and it's uploading multiple load files, correct? Why don't we invoke this from within the uploadLoadFile lambda function then?

We could do it right before the uploader.Upload call so that we measure just how long it takes to upload.

Copy link
Member Author

@achettyiitr achettyiitr Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 staging file is broken down into multiple load files, I thought the error Metrics with the same name and label values was caused because we are sending the metrics multiple times from multiple goroutines for some race conditions.
So, instead of sending metrics for each file, we can just send the total upload time metrics?

Copy link
Member Author

@achettyiitr achettyiitr Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will something like this work?

	defer func() {
		if err == nil {
			jr.timerStat("load_file_total_upload_time").SendTiming(totalUploadTime.Load())
		}
	}()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand why you redesigned this but I don't think that is the culprit. Also, we should be able to send multiple timings for the same timer concurrently. The underlying framework should handle that.

From a metrics perspective it is correct to measure how long it takes to upload each file. I think we should keep that like it was before so that we can have more useful metrics in our dashboards. Having the total is a plus and we could consider it if we think it is useful and we cannot derive it with a promql query.

Comment on lines -21 to -24
type Tag struct {
Name string
Value string
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already this present in warehouseutils.Tag

@achettyiitr achettyiitr force-pushed the chore.slave-upload-load-files branch 6 times, most recently from 65543a4 to 2e1883f Compare July 4, 2023 08:06
@achettyiitr achettyiitr force-pushed the chore.slave-upload-load-files branch from 2e1883f to 3f2ae82 Compare July 4, 2023 08:45
Copy link
Collaborator

@fracasula fracasula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, added a few comments, mostly optional.

warehouse/slave_test.go Outdated Show resolved Hide resolved
warehouse/slave_test.go Outdated Show resolved Hide resolved
Comment on lines +232 to +235
if tc.wantError != nil {
require.EqualError(t, err, tc.wantError.Error())
return
}
Copy link
Collaborator

@fracasula fracasula Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] could be simplified but you need to change wantError to string:

Suggested change
if tc.wantError != nil {
require.EqualError(t, err, tc.wantError.Error())
return
}
if tc.wantError != "" {
require.EqualError(t, err, tc.wantError)
return
}

warehouse/stats.go Outdated Show resolved Hide resolved
warehouse/slave.go Outdated Show resolved Hide resolved
@achettyiitr
Copy link
Member Author

Thanks, @fracasula for the detailed feedback.

Copy link
Member

@lvrach lvrach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super minor stuff

warehouse/slave.go Outdated Show resolved Hide resolved

type asyncJobRunResult struct {
Result bool
Id string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor]

Suggested change
Id string
ID string

We can use JSON tag if we marshal this

Comment on lines +106 to +110
if conf.IsSet("Warehouse.slaveUploadTimeout") {
jr.config.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeout", 10, time.Minute)
} else {
jr.config.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried chaining but it is not working as expected:

jr.config.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeout", 10, conf.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute))
func TestGetDuration(t *testing.T) {
	{
		c := New()
		t.Log(c.GetDuration("durationInMin", 5, c.GetDuration("duration", 5, time.Minute)))
	}
	{
		c := New()
		c.Set("duration", "10m")
		t.Log(c.GetDuration("durationInMin", 5, c.GetDuration("duration", 5, time.Minute)))
	}
	{
		c := New()
		c.Set("durationInMin", "20m")
		t.Log(c.GetDuration("durationInMin", 5, c.GetDuration("duration", 5, time.Minute)))
	}
	{
		c := New()
		c.Set("duration", "10m")
		c.Set("durationInMin", "20m")
		t.Log(c.GetDuration("durationInMin", 5, c.GetDuration("duration", 5, time.Minute)))
	}
}
    config_test.go:327: 25m0s
    config_test.go:332: 50m0s
    config_test.go:337: 20m0s
    config_test.go:343: 20m0s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants