-
Notifications
You must be signed in to change notification settings - Fork 781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Turns on the history scavenger for SQL backends #3462
Turns on the history scavenger for SQL backends #3462
Conversation
func (s *Service) startScanner() { | ||
sc := scanner.New( | ||
func (s *Service) initScanner() { | ||
s.scanner = scanner.New( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us to close the scanner when the service closes (which waits for all of its coroutines to finish)
go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName) | ||
workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName) | ||
} | ||
|
||
for _, tl := range workerTaskQueueNames { | ||
work := worker.New(s.context.sdkSystemClient, tl, workerOpts) | ||
work := s.context.workerFactory.New(s.context.sdkSystemClient, tl, workerOpts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Injecting a worker factory here allows me to test which workers were created. If I just use worker.New
, it does some assertion that the worker is created by a real sdk client.
} | ||
|
||
// Scanner is the background sub-system that does full scans | ||
// of database tables to cleanup resources, monitor anamolies | ||
// and emit stats for analytics | ||
Scanner struct { | ||
context scannerContext | ||
wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the benefit to have this wg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of my assertions are for side-effects in goroutines. If I don't wait for them to finish, the test could be flaky due to race conditions.
service/worker/scanner/scanner.go
Outdated
|
||
"go.temporal.io/server/common/backoff" | ||
"go.temporal.io/server/common/dynamicconfig" | ||
"go.temporal.io/server/common/log/tag" | ||
) | ||
|
||
const ( | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to use var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows me to override the sleep duration to 0 during testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can remove this delay and leverage the retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to rerun the CI with that idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI does pass with this removed. I think it's probably safe to do
defer func(originalDelay time.Duration) { | ||
scannerStartUpDelay = originalDelay | ||
}(scannerStartUpDelay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this work after you set the delay to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. All of the deferred function's arguments are computed immediately. For example:
package main
func main() {
x := 0
defer func(x int) {
println(x) // prints 0
}(x)
x = 42
}
e374a0b
to
e4fd9b5
Compare
What changed?
In this PR, I enabled the history scanner to delete unused history tree branches. In addition, I removed an else which made it impossible for the history scanner and task queue scanner to be on simultaneously (task queue scanner would take priority).
Why?
I made these changes for #3419 in order to rule this out as an option for why disk usage is increasing so much over time for SQL backends.
How did you test it?
I added a new test to the Scanner which verifies that we actually start the scavenger now regardless of the storage backend we're using.
Potential risks
The biggest risk I can think of is that this change could turn on the scavenger for NoSQL backends, and it could have a bug which causes it to remove branches which it shouldn't, which could interfere with workflow progress and lead to data loss.
Is hotfix candidate?
This PR is not a hot fix candidate which requires a notification to the broader community. However, we should tell the people on the original PR that this fix is in, so that they can patch it and see if it fixes their issue.