-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement close notifier and timeout on executors #4889
Conversation
@@ -330,6 +331,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { | |||
if res.Err != nil { | |||
return res.Err | |||
} | |||
close(closing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the close(closing)
right after the creation (L320) and do it as a defer. That way it'll handle it for panics too. If it closes and the results are done reading the it shouldn't do anything anyway.
a7bb47a
to
24e05df
Compare
break | ||
case <-time.After(30 * time.Second): | ||
// This should never happen, so if it does, it is a problem | ||
out <- &models.Row{Err: fmt.Errorf("execute was closed by caller")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error text still looks wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, I missed this one. will fix.
@corylanou -- can you scrub the error messages? We can't differentiate between cancellation and timeout right now. |
24e05df
to
be488b7
Compare
@otoolep fixed the duplicated error messages. |
OK, seems good now. I never like to see magic numbers in the code, and if nothing else I would think the 30 seconds value should be |
Implement close notifier and timeout on executors
func (e *ShowMeasurementsExecutor) Execute() <-chan *models.Row { | ||
func (e *ShowMeasurementsExecutor) Execute(closing <-chan struct{}) <-chan *models.Row { | ||
// It's important that all resources are released when execution completes. | ||
e.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think e.close()
could not be invoked here.
Please ref issue #4926 and pr #4927
Thanks! @corylanou
This PR does two things:
CloseNotifier
implementation for our Query handler endpoint. This will pass in a closing channel to the executors. If the HTTP client disconnects, we will cancel the query on the server.