Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: archive jobs to object storage #3721

Merged
merged 10 commits into from
Aug 17, 2023
Merged

feat: archive jobs to object storage #3721

merged 10 commits into from
Aug 17, 2023

Conversation

Sidddddarth
Copy link
Member

@Sidddddarth Sidddddarth commented Aug 8, 2023

Description

Processor writes the events to a new jobsdb - archivalJobsDB
Archiver reads jobs from archive jobsdb and uploads to user configured object storage(if eligible)
Enabled for gateway jobs(archived at a source level, and further in an hourly folder in the source prefix)

BenchmarkArchiverIsolation/numWorkspaces:_25_-_numSourcePerWorkspace:_4_-_numJobsPerSource:_1000_-_totalJobs:_100000-10         	       1	33437059167 ns/op	        33.44 overall_duration_seconds	1571687416 B/op	26652032 allocs/op

BenchmarkArchiverIsolation/numWorkspaces:_100_-_numSourcePerWorkspace:_5_-_numJobsPerSource:_1000_-_totalJobs:_500000-10        	       1	134082533166 ns/op	       134.1 overall_duration_seconds	8210319192 B/op	139518565 allocs/op

Linear Ticket

Notion Link
feed the archival

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@codecov
Copy link

codecov bot commented Aug 14, 2023

Codecov Report

Patch coverage: 70.53% and project coverage change: +0.27% 🎉

Comparison is base (264c52a) 68.55% compared to head (c73aca2) 68.82%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3721      +/-   ##
==========================================
+ Coverage   68.55%   68.82%   +0.27%     
==========================================
  Files         344      347       +3     
  Lines       51565    51989     +424     
==========================================
+ Hits        35349    35783     +434     
+ Misses      13935    13921      -14     
- Partials     2281     2285       +4     
Files Changed Coverage Δ
app/apphandlers/processorAppHandler.go 8.56% <0.00%> (-0.77%) ⬇️
app/cluster/dynamic.go 54.64% <40.00%> (-0.85%) ⬇️
archiver/worker.go 57.37% <57.37%> (ø)
archiver/options.go 66.66% <66.66%> (ø)
app/apphandlers/embeddedAppHandler.go 76.67% <85.71%> (+0.72%) ⬆️
processor/processor.go 87.71% <87.75%> (+0.21%) ⬆️
archiver/archiver.go 92.74% <92.74%> (ø)
jobsdb/admin.go 83.63% <100.00%> (ø)
jobsdb/jobsdb.go 72.83% <100.00%> (+0.20%) ⬆️
processor/manager.go 90.16% <100.00%> (+0.16%) ⬆️

... and 21 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

archiver/archiver_test.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
)

for _, job := range jobs {
j, err := marshalJob(job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to use json.Encoder instead and write directly to the gzWriter https://pkg.go.dev/encoding/json#NewEncoder.

Less lines of code and possibly less memory allocations


fileUploader, err := w.storageProvider.GetFileManager(workspaceID)
if err != nil {
log.Errorw("Skipping storing errors since no file manager is found", "error", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under which conditions this error is possible? will we get error logs if customer has not configured backups ?

Copy link
Member Author

@Sidddddarth Sidddddarth Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under which conditions this error is possible?

This particular scenario can only happen when the workspace is not part of the config this server instance serves.
The logs will contain workspaceID if that's what you're talking about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it reached this point, these jobs will be marked aborted in the next iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a sentinel error to make this error path more straightforward?

archiver/worker.go Outdated Show resolved Hide resolved
Copy link
Member

@lvrach lvrach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only blocking request is checking file.Close() error
https://www.joeshaw.org/dont-defer-close-on-writable-files/

archiver/worker.go Outdated Show resolved Hide resolved
if err != nil {
return "", fmt.Errorf("failed to open file %s: %w", path, err)
}
defer func() { _ = file.Close() }()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[blocking] It is not a good idea to ignore the error from file.Close(), during close we might need to flush data to disk, and if it fails data will be lost. Given the criticality of those backups, we need to be extra careful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only opening the file for reading and sending its contents to the object storage.
If this operation fails, the jobs will not be marked as succeeded and archiver will retry, i.e. a new file will be created containing the same jobs. What should we do in case closing the file that we opened for reading fails here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, due to various comments the order of events was unclear. No a blocking comment, but if masrhalJob we don't close the gzWriter

archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
@lvrach lvrach requested a review from cisse21 August 16, 2023 09:13
app/cluster/dynamic.go Outdated Show resolved Hide resolved
archiver/worker.go Show resolved Hide resolved
if err != nil {
return "", fmt.Errorf("failed to open file %s: %w", path, err)
}
defer func() { _ = file.Close() }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only opening the file for reading and sending its contents to the object storage.
If this operation fails, the jobs will not be marked as succeeded and archiver will retry, i.e. a new file will be created containing the same jobs. What should we do in case closing the file that we opened for reading fails here?

jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for future improvement: jobsdb.HandleT#doCleanup method picks up jobs in executing state as well.

This means that both the following are theoretically possible:

  • Duplicate terminal states are recorded for a job, one by the cleanup goroutine and another by the goroutine which marked jobs as executing.
  • The cleanup goroutine marks a job as aborted and the other goroutine marks it as failed.

Thus we might need to reconsider in the future having the following in mind:

  1. Dealing with jobs in executing state
  2. Keeping pending events consistent

@Sidddddarth Sidddddarth force-pushed the feat.archiver branch 2 times, most recently from cf09e34 to ea4d09e Compare August 17, 2023 05:46
archiver/worker.go Outdated Show resolved Hide resolved
archiver/worker.go Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants