Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backpressure when all connections are dead #698

Closed
wants to merge 6 commits into from
Closed

add backpressure when all connections are dead #698

wants to merge 6 commits into from

Conversation

rwynn
Copy link
Contributor

@rwynn rwynn commented Jan 31, 2018

I've noticed that if I have a BulkProcessor with a short FlushInterval and all connections to Elasticsearch become dead, bad things happen. Since the BulkService requests slice is only reset after a successful commit, the short flush interval causes the Processor to bang the same set of requests over and over again against dead connections. Additionally, without any back pressure, producers can potentially keep adding fuel to the fire by adding to the requests slice of the BulkService.

This PR is an attempt to add some relief in this scenario. I think it can possibly be enhanced to only trigger liveness checks after errors of "type connection".

Originally, I was looking into why requests were being repeated so often on failure when I pulled the plug on Elasticsearch. My retrier was set to a StopRetrier, so something was fishy. Then I realized that the request state doesn't get cleared on failure and my FlushInterval is less than a second.

This PR can be tested starting a BulkProcessor with a sub second FlushInterval, adding some BulkableRequests, then stopping Elasticsearch, and finally adding a few more BulkableRequests.

@olivere
Copy link
Owner

olivere commented Jan 31, 2018

Thanks for taking a deep dive into this issue. I will have to review this carefully, so it might take a while. Busy times at my day job...

@olivere olivere added the bug label Jan 31, 2018
Allow the user to configure the deadline after which reconnection
attempts for a network error are aborted.  Abort reconnection attempts if
an interrupt signal is detected.  Only start the reconnection process if
the error implements the net.Error interface.
@@ -545,6 +545,7 @@ func (w *bulkWorker) waitForActiveConnection() {
deadline := time.NewTimer(w.p.stopRecon)
interrupt := make(chan os.Signal, 1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are you trying to achieve with signals in Elastic?

Copy link
Contributor Author

@rwynn rwynn Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my test case I have a main process that is listening on an event source and using these events to add work to an Elastic bulk processor. This main process also listens for Ctrl-C signals in another go routine and in such a case exits the event loop and calls Close on the bulk processor.
Now, the reason I added signals to this PR is that when Elastic enters this new waiting state, calls to both Add and Close on the processor will block. Thus in my test case, with Elasticsearch down, I could no longer quit the main process with Ctrl-C since this was calling Close() but that blocks until the deadline. So I needed a way for waitForConnections to exit without needing to restart the Elasticsearch server.
Maybe I could have achieved this by adding a another channel to the bulk processor and pushing a message to it when Close() was called.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then we need to change Close in a way that it doesn't block, or at least times out without relying on signals.

I see your use case. But I think there should be a more general mechanism (besides Ctrl-C) to stop/close bulk processor cleanly. I see signals typically used with console-based apps, and don't see a good reason to rely on them outside of main.

@olivere
Copy link
Owner

olivere commented Feb 10, 2018

I have added a recipe for testing bulk processor under recipes/bulk_processor. I just added it for blackbox-testing this PR.

If you're using this with Elastic before this PR, it looks like this (with the number of queued requests slowly increasing):

Queued=    0 Written=    1319 Succeeded=       0 Failed=       0 Comitted=     0 Flushed=     1
Queued=  749 Written=    2857 Succeeded=     330 Failed=       0 Comitted=     1 Flushed=     1
Queued=    0 Written=    4303 Succeeded=    3549 Failed=       0 Comitted=     8 Flushed=     2
Queued=    0 Written=    5705 Succeeded=    4441 Failed=       0 Comitted=    12 Flushed=     3
Queued=    0 Written=    7078 Succeeded=    5892 Failed=       0 Comitted=    16 Flushed=     4
Queued=    0 Written=    8529 Succeeded=    7230 Failed=       0 Comitted=    20 Flushed=     5
Queued=    0 Written=    9873 Succeeded=    8671 Failed=       0 Comitted=    24 Flushed=     6
Queued=    0 Written=   11285 Succeeded=    9996 Failed=       0 Comitted=    28 Flushed=     7
Queued=    0 Written=   12808 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=     8
Queued= 1382 Written=   14266 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=     9
Queued= 4162 Written=   15593 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    10
Queued= 4492 Written=   15924 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    11
Queued= 4495 Written=   15927 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    12
Queued= 4742 Written=   16174 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    13
Queued= 4745 Written=   16177 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    14
Queued= 4988 Written=   16420 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    15
Queued= 4991 Written=   16423 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    16
Queued= 5234 Written=   16666 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    17
Queued= 5237 Written=   16669 Succeeded=   11431 Failed=       0 Comitted=    32 Flushed=    18

If this PR is applied, it looks like this (stopping ES after a few iterations, the number of queued requests in the workers no longer increases):

Queued=    0 Written=    1298 Succeeded=       0 Failed=       0 Comitted=     0 Flushed=     1
Queued=  325 Written=    2837 Succeeded=    1631 Failed=       0 Comitted=     4 Flushed=     2
Queued=    0 Written=    4201 Succeeded=    2956 Failed=       0 Comitted=     8 Flushed=     2
Queued=    0 Written=    5568 Succeeded=    4293 Failed=       0 Comitted=    12 Flushed=     3
Queued=    0 Written=    6977 Succeeded=    5675 Failed=       0 Comitted=    16 Flushed=     4
Queued=    0 Written=    8426 Succeeded=    7095 Failed=       0 Comitted=    20 Flushed=     6
Queued=    0 Written=    9811 Succeeded=    8529 Failed=       0 Comitted=    24 Flushed=     6
Queued=    0 Written=   11176 Succeeded=    9898 Failed=       0 Comitted=    28 Flushed=     7
Queued=  340 Written=   12557 Succeeded=   11286 Failed=       0 Comitted=    32 Flushed=     9
Queued=    0 Written=   14120 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=     9
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10
Queued=  769 Written=   14126 Succeeded=   12638 Failed=       0 Comitted=    36 Flushed=    10

So from the outside, it's looking good.

I have two more questions wrt this PR: Will your PR ever give up with retrying, or is it in an infinite loop, trying to commit? What is the need for waiting for an external signal with SIGINT?

@olivere
Copy link
Owner

olivere commented Feb 10, 2018

Hmm... there's something wrong after applying your patch. Simply run the bulk_processor recipe, then stop ES after ~10s. It starts well, then starts to enqueue again, behaving just like before:

$ go run main.go
Queued=    0 Written=    1358 Succeeded=       0 Failed=       0 Comitted=     0 Flushed=     0
Queued=    0 Written=    2839 Succeeded=    1723 Failed=       0 Comitted=     4 Flushed=     1
Queued=  385 Written=    4374 Succeeded=    2957 Failed=       0 Comitted=     8 Flushed=     3
Queued=    0 Written=    5788 Succeeded=    4487 Failed=       0 Comitted=    12 Flushed=     3
Queued=    0 Written=    7209 Succeeded=    5906 Failed=       0 Comitted=    16 Flushed=     4
Queued=    0 Written=    8606 Succeeded=    7337 Failed=       0 Comitted=    20 Flushed=     5
Queued=    0 Written=   10000 Succeeded=    8734 Failed=       0 Comitted=    24 Flushed=     7
Queued=    0 Written=   11407 Succeeded=   10111 Failed=       0 Comitted=    28 Flushed=     7
Queued=    0 Written=   12845 Succeeded=   11517 Failed=       0 Comitted=    32 Flushed=     8
Queued=    0 Written=   14208 Succeeded=   12969 Failed=       0 Comitted=    36 Flushed=     9
Queued=    0 Written=   15765 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
...
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1449 Written=   15767 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    11
Queued= 1453 Written=   16897 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    12
Queued= 2916 Written=   18243 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    14
Queued= 4246 Written=   18564 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    14
Queued= 4496 Written=   18814 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    16
Queued= 4500 Written=   18818 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    16
Queued= 4742 Written=   19060 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    17
Queued= 4745 Written=   19063 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    18
Queued= 4988 Written=   19306 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    19
Queued= 4991 Written=   19309 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    20
Queued= 5234 Written=   19552 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    21
Queued= 5237 Written=   19555 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    22
Queued= 5480 Written=   19798 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    24
Queued= 5483 Written=   19801 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    24
Queued= 5726 Written=   20044 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    26
Queued= 5729 Written=   20047 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    26
Queued= 5976 Written=   20294 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    27
Queued= 5979 Written=   20297 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    28
Queued= 6222 Written=   20540 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    29
Queued= 6225 Written=   20543 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    30
Queued= 6468 Written=   20786 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    31
Queued= 6471 Written=   20789 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    32
Queued= 6718 Written=   21036 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    33
Queued= 6721 Written=   21039 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    34
Queued= 6964 Written=   21282 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    36
Queued= 6967 Written=   21285 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    36
Queued= 7210 Written=   21528 Succeeded=   14317 Failed=       0 Comitted=    40 Flushed=    38

You need to run for a few minutes though. Notice that I didn't start ES again. Can you please test this with your PR?

@rwynn
Copy link
Contributor Author

rwynn commented Feb 10, 2018

Thanks for creating the recipe, that should really help figure out what's going on. I will take a look this afternoon and see if I can clean it up and remove the external signal check.

Wrt your questions, currently the PR is setup to default to a 5 minute deadline after which it should give up trying to find connections. The signal check was there so that if someone externally didn't want to wait for this deadline they could also Ctrl-C to force it to stop trying immediately.

I hope to remove this signal check and instead use a ReadWrite lock to ensure that both external commands Stop/Close() and Flush() do not block on dead connections. They will do this by writing to a variable that causes the connection checker to stop. Each connection checker will read a variable indicating whether or not it should give up. After that hopefully only Add() calls and the flushTicker will block on dead connections, and external Flush() and Stop/Close() will not block on dead connections.

@olivere
Copy link
Owner

olivere commented Feb 10, 2018

Sorry for not digging into the details yet. From what you describe, you would want to tell a list of goroutines to exit when calling Close or Stop. What I typically use these days is a channel for signalling instead of a locking mechanism.

@rwynn
Copy link
Contributor Author

rwynn commented Feb 10, 2018

Yes, closing a channel is idiomatic. I will try doing that to signal stop to the connection checkers.

Remove the use of os signals
Use a closed channel to notify connection checkers to stop
Keep checking until an active connection is found  or the processor is stopped
@rwynn
Copy link
Contributor Author

rwynn commented Feb 10, 2018

Refactored the PR. This is simpler and seems to work well against the test. Let me know whenever you get a chance to take a look. Thanks!

@olivere
Copy link
Owner

olivere commented Feb 11, 2018

That's looking good. I've been testing the patch with race detector enabled. It found one issue:

Queued=  373 Written=   53998 Succeeded=   52692 Failed=       0 Comitted=   150 Flushed=    39
==================
WARNING: DATA RACE
Write at 0x00c4201d60cc by goroutine 28:
  internal/race.Write()
      /Users/oliver/go/src/internal/race/race.go:41 +0x38
  sync.(*WaitGroup).Wait()
      /Users/oliver/go/src/sync/waitgroup.go:127 +0xf3
  github.com/olivere/elastic.(*bulkWorker).work()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:448 +0xab

Previous read at 0x00c4201d60cc by goroutine 26:
  internal/race.Read()
      /Users/oliver/go/src/internal/race/race.go:37 +0x38
  sync.(*WaitGroup).Add()
      /Users/oliver/go/src/sync/waitgroup.go:70 +0x16e
  github.com/olivere/elastic.(*bulkWorker).commit()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:520 +0x694
  github.com/olivere/elastic.(*bulkWorker).work()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:468 +0x3cd

Goroutine 28 (running) created at:
  github.com/olivere/elastic.(*BulkProcessor).Start()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:307 +0x359
  github.com/olivere/elastic.(*BulkProcessorService).Do()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:159 +0x3af
  main.main()
      /Users/oliver/src/github.com/olivere/elastic/recipes/bulk_processor/main.go:83 +0x883

Goroutine 26 (running) created at:
  github.com/olivere/elastic.(*BulkProcessor).Start()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:307 +0x359
  github.com/olivere/elastic.(*BulkProcessorService).Do()
      /Users/oliver/src/github.com/olivere/elastic/bulk_processor.go:159 +0x3af
  main.main()
      /Users/oliver/src/github.com/olivere/elastic/recipes/bulk_processor/main.go:83 +0x883
==================
Queued=  717 Written=   54004 Succeeded=   52692 Failed=       0 Comitted=   150 Flushed=    39

Can you reproduce that?

@rwynn
Copy link
Contributor Author

rwynn commented Feb 11, 2018

Thanks I’ll take a look when I get a chance. From first glance and re reading the Godoc it looks like you should never call Wait before Add. I didn’t realize that was not allowed. I think if I ensure all the new wg.Wait calls happen after the Add/go calls then this will go away. The behavior might be slightly different as each worker would then independently find out about ES being down. So you would see each worker pausing before Add calls start blocking.

After a network error in a worker, subsequent workers may get an
ErrNoClient error so check for that also
olivere added a commit that referenced this pull request Feb 13, 2018
This commit does a final clean up before merging the PR for adding back
pressure into the BulkProcessor (#698).

It switches from using `sync.WaitGroup` to a simple channel for
signalling. It also uses a `time.Ticker` instead of a tight loop for
periodically checking for active connections.
@olivere
Copy link
Owner

olivere commented Feb 13, 2018

@rwynn Could you review my latest changes in c9e9d20. I've simply tried to clean up a bit here and there. Most significant change is to use a channel for signalling instead of a sync.WaitGroup and using a time.Ticker instead of a tight loop. If that does work for you, I think we're ready to go.

@rwynn
Copy link
Contributor Author

rwynn commented Feb 13, 2018

@olivere this looks really good to me. Check the comment I added previously to the code which mention waitgroup. That should be removed or reworded. Thanks very much for all your help with this.

@olivere
Copy link
Owner

olivere commented Feb 13, 2018

Oh yeah, you mean I should remove this block, right?

What I find interesting is that if you keep poking, you often end up with much less code than what you've started with.

Anyway, I will release a new version later today.

Thanks so much for your persistence and being a good contributor to open source. 👍

@olivere olivere closed this in 9297f94 Feb 13, 2018
olivere added a commit that referenced this pull request Feb 13, 2018
This is a backport of #698 from v6
(9297f94).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants