Skip to content
This repository

Make connection pool fair with respect to waiting threads. #6416

Merged
merged 1 commit into from almost 2 years ago

6 participants

pmahoney Steve Klabnik Aaron Patterson Rafael Mendonça França Yasuo Honda Jonathan Rochkind
pmahoney

Introduction: I'm using JRuby in a Java servlet container to run an application that uses ActiveRecord. This is a fully multithreaded environment where a thread pool services http requests, possibly in parallel. I've been having problems with timeouts when obtaining a database connection with only moderate concurrent requests.

(Reducing the size of the thread pool to match that of the connection pool may be one option, but I am running multiple apps in the servlet container all sharing one thread pool... So for now, the thread pool is 32 and the connection pool is 5).

I'm putting this patch out there to solicit comments. I think it needs more testing. I may have a chance this week to test in a real application, but for now I only have my two benchmark scripts ar_test.rb (for JRuby) and ar_test_mri.rb. Notably, I have not tested the behavior when connections are removed explicitly or removed by the reaper.

I also do not understand the distinction between PoolFullError and ConnectionTimeoutError. I also introduced an @available array of connections ready to be checked out. At the time this seemed like a reasonable idea, but I had to make some inelegant additions (essentially everywhere @connections is modified, @available must be also, e.g. #clear_reloadable_connections).

I apologize for the length of this report.

tl;dr the fairness patch reduces outliers in time to acquire a connection but needs review and testing

The test suite passes, tested on Linux/Ruby-1.9.2-p290.

activerecord$ bundle exec rake test_sqlite3
Finished tests in 193.646555s, 17.1859 tests/s, 52.2653 assertions/s.
3328 tests, 10121 assertions, 0 failures, 0 errors, 12 skips

This patch makes the connection pool "fair" in the first-come, first-served sense. It also avoids using the Timeout class (which spawns a dedicated thread for each connection checkout attempt that needs to to wait with a timeout).

I've added two tests that fail before applying this patch. The first ensures the connection pool is fair by queuing up several waiting threads and ensuring they acquire connections in order as connections are made available. The connections are trickled in one by one because we don't actually care when two connections become available that order is strictly preserved.

The second queues up two groups of waiting threads, then checks in enough connections for group 1 all at once. The test ensures that only the group 1 threads acquired connections.

A third test is the money test but was removed because it was unreliable. It attempted to test latency in a Java-servlet-like environment by setting up a thread pool and having each thread check connections in and out.

Instead, I used ar_test.rb to obtain histograms of the time per simulated request (checkout connection; sleep 0.01s; checkin connection).

Tests with ar_test.rb and JRuby 1.6.6

ActiveRecord 3.2.3, JRuby 1.6.6

$ bundle exec jruby --1.9 ar_test.rb
10000 reqs in 24.666 = 405.4 req/s
min/max/mean 0.01/2.37/0.08
histogram
[0.00,0.03) ooooooooooooooo
[0.03,0.05) o
[0.05,0.08) oooooooooooooooooooooooooooooooo
[0.08,0.10) ooooo
[0.10,0.13) o
[0.13,0.15) oooooo
[0.15,0.18) o
[0.18,0.20) oo
[0.20,0.22) o
[0.22,0.25) oo

ActiveRecord edge without fairness patch JRuby 1.6.6

Gave up after long time.  Many errors like this (notice time much longer than 5 s timeout):

took 120.067s to acquire and release
execution expired /home/pat/dev/ruby/rails/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb:106:in `await'

ActiveRecord edge with fairness patch, JRuby 1.6.6

$ bundle exec jruby --1.9 ar_test.rb
ActiveRecord::VERSION 4.0.0.beta
10000 reqs in 24.8 = 403.7 req/s
min/max/mean 0.06/1.48/0.08
histogram
[0.00,0.03) 
[0.03,0.05) 
[0.05,0.08) ooooooooooooooooooooooooooooooooooooooooooooooooo
[0.08,0.10) ooooooooooo
[0.10,0.13) o
[0.13,0.15) o
[0.15,0.18) o
[0.18,0.20) o
[0.20,0.22) o
[0.22,0.25) o

Tests with ar_test_mri.rb and JRuby 1.6.6

ActiveRecord 3.2.3, Ruby 1.9.2-p290

Sometimes errors like this:

could not obtain a database connection within 5 seconds (waited 5.000125524999999 seconds). The max pool size is currently 5; consider increasing it. /home/pat/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/activerecord-3.2.3/lib/active_record/connection_adapters/abstract/connection_pool.rb:248:in `block (2 levels) in checkout'

$ bundle exec ruby ar_test_mri.rb  # ruby 1.9.2-p290
ActiveRecord::VERSION::STRING 3.2.3
10000 reqs in 21.0 = 475.7 req/s
min/max/mean 0.01/4.74/0.07
histogram
[0.00,0.03) ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
[0.03,0.05) o
[0.05,0.08) o
[0.08,0.10) 
[0.10,0.12) o
[0.12,0.15) o
[0.15,0.17) o
[0.17,0.20) o
[0.20,0.22) o
[0.22,0.25) ooo

ActiveRecord edge without fairness patch, Ruby 1.9.2-p290

Again, many "execution expired" errors.

ActiveRecord edge with fairness patch, Ruby 1.9.2-p290

$ bundle exec ruby ar_test_mri.rb   # ruby 1.9.2-p290
ActiveRecord::VERSION::STRING 4.0.0.beta
10000 reqs in 21.147658059 = 472.86559921202286 req/s
min/max/mean 0.01/0.12/0.07
histogram
[0.00,0.03) o
[0.03,0.05) o
[0.05,0.08) ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
[0.08,0.10) o
[0.10,0.12) o
[0.12,0.15) 
[0.15,0.17) 
[0.17,0.20) 
[0.20,0.22) 
[0.22,0.25) 

So in both Ruby 1.9.2 and JRuby, ActiveRecord 3.2.3 has perhaps a better median but more variation with a few requests taking significantly longer than the others.

I'm only including ar_test_mri.rb here. The ar_test.rb script has only had cosmetic changes and is available in this other pull request: #6398

require 'active_record'
require 'thread'

SHUTDOWN = Object.new.freeze    # used to shutdown threads in pool

config = {
  'adapter' => 'sqlite3',
  'database' => ':memory:'
}

ActiveRecord::Base.establish_connection(config)
ActiveRecord::Base.clear_active_connections!

times = []
times.extend(MonitorMixin)

work = Queue.new

thread_pool_size = 32
nrequests = 10000

threads = (1..thread_pool_size).map do |i|
  Thread.new do
    while work.pop != SHUTDOWN
      t0 = Time.now
      begin
        ActiveRecord::Base.connection_pool.connection
        sleep 0.010
        ActiveRecord::Base.clear_active_connections!
      rescue => err
        puts "#{err.message} #{err.backtrace.join("\n\t")}"
      ensure
        times.synchronize { times << (Time.now - t0) }
      end
    end
  end
end

t0 = Time.now

nrequests.times { |i| work.push i }
thread_pool_size.times { work.push SHUTDOWN }
threads.each(&:join)

elapsed = Time.now - t0

puts "ActiveRecord::VERSION::STRING #{ActiveRecord::VERSION::STRING}"
puts "#{nrequests} reqs in #{elapsed} = #{nrequests/elapsed} req/s"
puts "min/max/mean %0.2f/%0.2f/%0.2f" % [times.min, times.max, times.reduce(&:+)/times.size]

def clamp(n, min, max)
  [[n, min].max, max].min
end

def hist(values, nbuckets = 10, min = values.min, max = values.max)
  step = (max - min) / nbuckets.to_f
  buckets = Array.new(nbuckets, 0)
  values.each do |v|
    i = clamp(((v - min) / step).floor, 0, nbuckets-1)
    buckets[i] += 1
  end

  start = 0
  buckets.each do |count|
    print "[%0.2f,%0.2f) " % [start, start + step]
    ntics = (count.to_f / values.size * 60).ceil # 60 screen cols
    puts 'o' * ntics
    start += step
  end
end

puts 'histogram'
hist(times, 10, 0.00, 0.25)
Steve Klabnik
Collaborator

This is awesome. In general. Thanks for the detailed write-up. I'm sure @tenderlove and @jonleighton will want to check this out.

Aaron Patterson
Owner

Awesome! It looks great! :+1:

Aaron Patterson tenderlove merged commit d2901f0 into from May 21, 2012
Aaron Patterson tenderlove closed this May 21, 2012
Rafael Mendonça França
Owner

@pmahoney this commit broke the build with PostgreSQL. Could you take a look?

Rafael Mendonça França rafaelfranca referenced this pull request from a commit May 22, 2012
Rafael Mendonça França Revert "Merge pull request #6416 from pmahoney/threadsafe-connection-…
…pool"

This reverts commit d2901f0, reversing
changes made to 525839f.

Conflicts:
	activerecord/test/cases/connection_pool_test.rb

Reason: This change broke the build (http://travis-ci.org/#!/rails/rails/builds/1391490)
and we don't have any solution until now. I asked the author to try to
fix it and open a new pull request.
7c69e2d
Rafael Mendonça França
Owner

I reverted d2901f0 since it broke the build one day ago and we don't have any solution until now. Please try to fix it and open a new pull request.

pmahoney

That's... interesting. I can reproduce that locally, so that's good. I refactored the code a bit, still get the same errors, but now this happens:

Run eager_test.rb alone, test passes:

$ ARCONN=postgresql bundle exec ruby -Itest test/cases/associations/eager_test.rb 
Finished tests in 4.363282s, 31.3984 tests/s, 93.9660 assertions/s.
137 tests, 410 assertions, 0 failures, 0 errors, 0 skips

Run with connection_pool.rb alone, test passes:

$ ARCONN=postgresql bundle exec ruby -Itest test/cases/connection_pool_test.rb
Finished tests in 7.867991s, 2.1607 tests/s, 5.2110 assertions/s.
17 tests, 41 assertions, 0 failures, 0 errors, 0 skips

Run them together, tests pass:

$ ARCONN=postgresql bundle exec ruby -Itest -e '["test/cases/connection_pool_test.rb", "test/cases/associations/eager_test.rb"].each { |f| load f }'
Finished tests in 13.071680s, 11.7812 tests/s, 34.5021 assertions/s.
154 tests, 451 assertions, 0 failures, 0 errors, 0 skips

But run the all the tests with the rake command, and I get those same failures.

Any ideas? I suppose I can try running larger and larger subsets of the tests to try to narrow it down... Or better, find out what rake is doing that plain ruby isn't.

(We've backported this patch to the 3.0 series, but this will be a blocker to us upgrading to 4.x, and I assume this will be true for many JRuby folks.)

Rafael Mendonça França
Owner

I have no clue now. @yahonda tried to fix it as he described in the issue #6443 but I think he doesn't have success too. I'll try to investigate later too.

pmahoney

I've gotten the failing case down to this:

$ ARCONN=postgresql ruby -w -I"lib:test" -e '["test/cases/associations/eager_test.rb", "test/cases/associations/join_model_test.rb", "test/cases/base_test.rb"].each { |f| load f }'

This also fails with sqlite3. Going to poke around a bit more...

pmahoney

For what it's worth, that same command fails against master (cb92efb) with both sqlite3 and postgresql:

$ ruby -w -I"lib:test" -e '["test/cases/associations/eager_test.rb", "test/cases/associations/join_model_test.rb", "test/cases/base_test.rb"].each { |f| load f }'
Using sqlite3
/home/pat/.rbenv/versions/1.9.2-p290/lib/ruby/1.9.1/rexml/doctype.rb:255: warning: `+' after local variable is interpreted as binary operator
/home/pat/.rbenv/versions/1.9.2-p290/lib/ruby/1.9.1/rexml/doctype.rb:255: warning: even though it seems like unary operator
Run options: --seed 6037

# Running tests:

....................................................................................................................................................................................................................................................S.....................................................................FF.........................................................................................................

Finished tests in 8.663860s, 48.5927 tests/s, 129.8497 assertions/s.

  1) Failure:
test_eager_loading_with_conditions_on_join_model_preloads(EagerAssociationTest) [test/cases/associations/eager_test.rb:1015]:
3 instead of 2 queries were executed.
Queries:
SELECT "authors".* FROM "authors" INNER JOIN "posts" ON "posts"."author_id" = "authors"."id" INNER JOIN "comments" ON "comments"."post_id" = "posts"."id" WHERE (posts.title like 'Welcome%')
          SELECT name
          FROM sqlite_master
          WHERE type = 'table' AND NOT name = 'sqlite_sequence'
 AND name = "author_addresses"
SELECT "author_addresses".* FROM "author_addresses"  WHERE "author_addresses"."id" IN (1).
Expected: 2
  Actual: 3

  2) Failure:
test_eager_loading_with_conditions_on_joined_table_preloads(EagerAssociationTest) [test/cases/associations/eager_test.rb:977]:
6 instead of 2 queries were executed.
Queries:
          SELECT name
          FROM sqlite_master
          WHERE type = 'table' AND NOT name = 'sqlite_sequence'
 AND name = "taggings"
PRAGMA table_info("taggings")
          SELECT name
          FROM sqlite_master
          WHERE type = 'table' AND NOT name = 'sqlite_sequence'
 AND name = "tags"
PRAGMA table_info("tags")
SELECT "posts".* FROM "posts" INNER JOIN "taggings" ON "taggings"."taggable_id" = "posts"."id" AND "taggings"."taggable_type" = 'Post' INNER JOIN "tags" ON "tags"."id" = "taggings"."tag_id" WHERE (tags.name = 'General') ORDER BY posts.id
SELECT "authors".* FROM "authors"  WHERE "authors"."id" IN (1).
Expected: 2
  Actual: 6

I think the problem is unrelated to my patch. It seems the tests are making assumption about what sort of table metadata has already been cached (sorry, I'm not too familiar with the inner workings of ActiveRecord)? If I modify the test test_eager_loading_with_conditions_on_join_model_preloads to run the query-under-test once outside the assert_queries(2), then a second time within assert_queries(2), the test passes.

Yasuo Honda
yahonda commented May 23, 2012

Thanks for the investigation. I agree that your patch is not cause of these failures.
I've made a very temporary fix

pmahoney

Thanks for looking into this! I was about to open an issue, but I'll hold off unless you think otherwise?

Yasuo Honda
yahonda commented May 23, 2012

Hi, I think these 2 failures should be discussed in a separate issue. Please go ahead to open a new one.

Jonathan Rochkind

yep, this patch is fixing something we fixed slightly differnetly in 3-2-stable already, but master had diverged at that point, and is something I'm running into too.

I don't completely understand the intended semantics of this patch, exactly what it's intended to fix how. But let me know if I can help, I'm becoming increasingly familiar with ConnectionPool's concurrency semantics (somewhat sadly :) ).

Have you really managed to enforce "fairness" so when threads are waiting on a connection, it's first-in/first-out? I can't quite figure out how your code does that, but I'm impressed if it does, I've been racking my brain and unable to come up with any way to do that with ruby's own mutex semantics.

Please let me know if/how I can help get this resolved (I'm interested in back-porting to 3.2.x, even if only in my own personal monkey patches, once it's in master. I am having trouble with these issues in 3.2)

pmahoney

@jrochkind it's much easier to see what's going on in this refactoring: pmahoney@ff1adc0 This is what I'm going to resubmit after a few more cleanups.

Take a look at the Queue class. Yes, it uses condition wait/signal. I have found (and have tests to prove it) that if you have, say 10 threads waiting on a condition var, then do 10.times { cond.signal; sleep 1 }, the waiting threads will wake up in the order they started waiting, consistent with the documentation of ConditionVariable).

I have also found that if 5 threads start waiting, then later a second group of 5 start waiting, then calling 5.times { cond.signal } (no sleep), the 5 threads in group1 will wake up, and none in group2. The group1 threads will not necessarily wake up in order, but that's fine for this use case.

(For what it's worth, I did implement a strictly fair queue where each waiting thread put itself into an Array and on wakeup, made sure it was first in line, or else re-signaled and began waiting again).

If that's true, then the way to prevent a new thread (one that has not been waiting) from "stealing" a connection from a thread that has been waiting is to check @num_waiting. When a thread is waiting, it increments this number, and that holds true even when it releases the lock during the call to #wait. I can easily modify Queue class to allow "stealing", in which case the histogram or response times looks a lot different (there are quite a few requests that take a longish time to acquire a connection). I may blog about this and make some nicer graphs, because I found it interesting.

Do those test results conflict with things you have seen?

Jonathan Rochkind

@pmahoney Well, hm. Here's what I've seen (and even have a test I could dig up to demonstrate).

  • thread1 wants a connection, and none are available so it waits.
  • Later another thread checks the connection back in, so it #signals.
  • thread1 wakes up -- inside the mutex -- but finds that there's still no connection available. Somehow, some other thread has grabbed it in between the checkin/signal, and when thread1 actually gets scheduled inside the mutex again.

Now, the thread that 'stole' the mutex out of order may not have been a thread that was actually waiting. It may be that in between the signal (when thread1 becomes 'runnable'), and when thread1 is actually scheduled, some completely different thread (that was not previously waiting), manages to get scheduled, tries to a checkout -- finds that, hey, there's a connection available (because thread1 hasn't been scheduled yet so hasn't had a chance to take it), and grabs it without waiting at all. I believe that is in fact what was happening.

If your code doesn't solve that issue --and I don't think it does -- then I'm not sure I understand what issue your code does solve, vs. the state before your patch. pre-your-patch, threads were already waiting on a ConditionVariable, and would premably already get woken up in order, right? Although your test results certainly seem to show that things are improved, I just don't understand why/how. :) (update q: Can you confirm that under your new code, as demonstrated in the tests, no thread will ever wait (much) longer than it's timeout? That's the most alarming feature of master pre-patch, I think we should make sure that never happens -- better to have a thread timeout because it had a connection 'stolen' then to have a thread that can wait without timing out with no upper bound).

I had a test that sort of demonstrated this, but it was kind of a crazy hacky test -- the sort of race condition it is, the test basically needed to run like 100 threads, and maybe 5% of them or what have you would get their connection 'stolen'. The test demonstrated this under an old version of 3-2-stable, and it was demonstrated by some threads winding up raising the TimeoutException long before the @timeout value -- because they got woken up, but then found there was still no connection available, and in the code at that time, that would cause them to raise. But that exact test woudln't demonstrate against current 3-2-stable, because it's been patched so when this happens, the thread just goes to sleep again without waiting. But the underlying problem, of other threads coming in and stealing connections, is still there. The test would need to be rejiggered a bit to show that in current 3-2-stable of master (which works somewhat differently already) -- it's tricky to figure out how to demonstrate these kinds of race conditions, but I could try to work it out -- but I couldn't figure out any way to solve it.

Jonathan Rochkind

@pmahoney Ah, wait, I think I see how you've solved it.

In my case, the thread 'stealing' the connection, let's call it ThreadX, wasn't waiting at all. It managed to get scheduled, and do a checkout, in between the signal and when the first waiting thread was actually scheduled.

In your new fix, that ThreadX might still get scheduled in between the signal and when thread1 actually gets scheduled to get the connection it's due -- but it'll see that @num_waiting isn't zero, and go to sleep itself with a wait, going to the end of the line, instead of 'stealing' the connection.

Genius!

I still don't understand how your original patch, the one actually in this pull, managed to improve things -- although I believe your test results that it did, I just don't understand it!

But I understand how your new fix manages to improve things quite a bit with @num_waiting, nice! Once this gets into master, I'm really going to want to see if there's a way to backport it to 3-2-stable, to see if it solves my problems in my 3-2 app.

pmahoney

Resubmitted here: #6488

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

May 20, 2012
pmahoney Make connection pool fair with respect to waiting threads. d06674d
This page is out of date. Refresh to see the latest.
120  activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -2,7 +2,6 @@
2 2
 require 'monitor'
3 3
 require 'set'
4 4
 require 'active_support/core_ext/module/deprecation'
5  
-require 'timeout'
6 5
 
7 6
 module ActiveRecord
8 7
   # Raised when a connection could not be obtained within the connection
@@ -92,21 +91,6 @@ def run
92 91
       attr_accessor :automatic_reconnect, :timeout
93 92
       attr_reader :spec, :connections, :size, :reaper
94 93
 
95  
-      class Latch # :nodoc:
96  
-        def initialize
97  
-          @mutex = Mutex.new
98  
-          @cond  = ConditionVariable.new
99  
-        end
100  
-
101  
-        def release
102  
-          @mutex.synchronize { @cond.broadcast }
103  
-        end
104  
-
105  
-        def await
106  
-          @mutex.synchronize { @cond.wait @mutex }
107  
-        end
108  
-      end
109  
-
110 94
       # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
111 95
       # object which describes database connection information (e.g. adapter,
112 96
       # host name, username, password, etc), as well as the maximum size for
@@ -128,9 +112,25 @@ def initialize(spec)
128 112
         # default max pool size to 5
129 113
         @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
130 114
 
131  
-        @latch = Latch.new
132 115
         @connections         = []
133 116
         @automatic_reconnect = true
  117
+
  118
+        # connections available to be checked out
  119
+        @available = []
  120
+
  121
+        # number of threads waiting to check out a connection
  122
+        @num_waiting = 0
  123
+
  124
+        # signal threads waiting
  125
+        @cond = new_cond
  126
+      end
  127
+
  128
+      # Hack for tests to be able to add connections.  Do not call outside of tests
  129
+      def insert_connection_for_test!(c)
  130
+        synchronize do
  131
+          @connections << c
  132
+          @available << c
  133
+        end
134 134
       end
135 135
 
136 136
       # Retrieve the connection associated with the current thread, or call
@@ -188,6 +188,7 @@ def disconnect!
188 188
             conn.disconnect!
189 189
           end
190 190
           @connections = []
  191
+          @available = []
191 192
         end
192 193
       end
193 194
 
@@ -202,6 +203,9 @@ def clear_reloadable_connections!
202 203
           @connections.delete_if do |conn|
203 204
             conn.requires_reloading?
204 205
           end
  206
+          @available.delete_if do |conn|
  207
+            conn.requires_reloading?
  208
+          end
205 209
         end
206 210
       end
207 211
 
@@ -225,23 +229,19 @@ def clear_stale_cached_connections! # :nodoc:
225 229
       # Raises:
226 230
       # - PoolFullError: no connection can be obtained from the pool.
227 231
       def checkout
228  
-        loop do
229  
-          # Checkout an available connection
230  
-          synchronize do
231  
-            # Try to find a connection that hasn't been leased, and lease it
232  
-            conn = connections.find { |c| c.lease }
233  
-
234  
-            # If all connections were leased, and we have room to expand,
235  
-            # create a new connection and lease it.
236  
-            if !conn && connections.size < size
237  
-              conn = checkout_new_connection
238  
-              conn.lease
239  
-            end
  232
+        synchronize do
  233
+          conn = nil
  234
+
  235
+          if @num_waiting == 0
  236
+            conn = acquire_connection
  237
+          end
240 238
 
241  
-            return checkout_and_verify(conn) if conn
  239
+          unless conn
  240
+            conn = wait_until(@timeout) { acquire_connection }
242 241
           end
243 242
 
244  
-          Timeout.timeout(@timeout, PoolFullError) { @latch.await }
  243
+          conn.lease
  244
+          checkout_and_verify(conn)
245 245
         end
246 246
       end
247 247
 
@@ -257,8 +257,10 @@ def checkin(conn)
257 257
           end
258 258
 
259 259
           release conn
  260
+
  261
+          @available.unshift conn
  262
+          @cond.signal
260 263
         end
261  
-        @latch.release
262 264
       end
263 265
 
264 266
       # Remove a connection from the connection pool.  The connection will
@@ -266,12 +268,14 @@ def checkin(conn)
266 268
       def remove(conn)
267 269
         synchronize do
268 270
           @connections.delete conn
  271
+          @available.delete conn
269 272
 
270 273
           # FIXME: we might want to store the key on the connection so that removing
271 274
           # from the reserved hash will be a little easier.
272 275
           release conn
  276
+
  277
+          @cond.signal          # can make a new connection now
273 278
         end
274  
-        @latch.release
275 279
       end
276 280
 
277 281
       # Removes dead connections from the pool.  A dead connection can occur
@@ -283,12 +287,60 @@ def reap
283 287
           connections.dup.each do |conn|
284 288
             remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
285 289
           end
  290
+          @cond.broadcast       # may violate fairness
286 291
         end
287  
-        @latch.release
288 292
       end
289 293
 
290 294
       private
291 295
 
  296
+      # Take an available connection or, if possible, create a new
  297
+      # one, or nil.
  298
+      #
  299
+      # Monitor must be held while calling this method.
  300
+      #
  301
+      # Returns: a newly acquired connection.
  302
+      def acquire_connection
  303
+        if @available.any?
  304
+          @available.pop
  305
+        elsif connections.size < size
  306
+          checkout_new_connection
  307
+        end
  308
+      end
  309
+
  310
+      # Wait on +@cond+ until the block returns non-nil.  Note that
  311
+      # unlike MonitorMixin::ConditionVariable#wait_until, this method
  312
+      # does not test the block before the first wait period.
  313
+      #
  314
+      # Monitor must be held when calling this method.
  315
+      #
  316
+      # +timeout+: Integer timeout in seconds
  317
+      #
  318
+      # Returns: the result of the block
  319
+      #
  320
+      # Raises:
  321
+      # - PoolFullError: timeout elapsed before +&block+ returned a connection
  322
+      def wait_until(timeout, &block)
  323
+        @num_waiting += 1
  324
+        begin
  325
+          t0 = Time.now
  326
+          loop do
  327
+            elapsed = Time.now - t0
  328
+            if elapsed >= timeout
  329
+              msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
  330
+                [timeout, elapsed]
  331
+              raise PoolFullError, msg
  332
+            end
  333
+
  334
+            @cond.wait(timeout - elapsed)
  335
+
  336
+            conn = yield
  337
+            return conn if conn
  338
+          end
  339
+        ensure
  340
+          @num_waiting -= 1
  341
+        end
  342
+      end
  343
+
292 344
       def release(conn)
293 345
         thread_id = if @reserved_connections[current_connection_id] == conn
294 346
           current_connection_id
2  activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
36 36
 
37 37
       def test_close
38 38
         pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
39  
-        pool.connections << adapter
  39
+        pool.insert_connection_for_test! adapter
40 40
         adapter.pool = pool
41 41
 
42 42
         # Make sure the pool marks the connection in use
115  activerecord/test/cases/connection_pool_test.rb
@@ -200,6 +200,121 @@ def test_checkout_behaviour
200 200
         end.join
201 201
       end
202 202
 
  203
+      # The connection pool is "fair" if threads waiting for
  204
+      # connections receive them the order in which they began
  205
+      # waiting.  This ensures that we don't timeout one HTTP request
  206
+      # even while well under capacity in a multi-threaded environment
  207
+      # such as a Java servlet container.
  208
+      #
  209
+      # We don't need strict fairness: if two connections become
  210
+      # available at the same time, it's fine of two threads that were
  211
+      # waiting acquire the connections out of order.
  212
+      #
  213
+      # Thus this test prepares waiting threads and then trickles in
  214
+      # available connections slowly, ensuring the wakeup order is
  215
+      # correct in this case.
  216
+      #
  217
+      # Try a few times since it might work out just by chance.
  218
+      def test_checkout_fairness
  219
+        4.times { setup; do_checkout_fairness }
  220
+      end
  221
+
  222
+      def do_checkout_fairness
  223
+        expected = (1..@pool.size).to_a.freeze
  224
+        # check out all connections so our threads start out waiting
  225
+        conns = expected.map { @pool.checkout }
  226
+        mutex = Mutex.new
  227
+        order = []
  228
+        errors = []
  229
+
  230
+        threads = expected.map do |i|
  231
+          t = Thread.new {
  232
+            begin
  233
+              conn = @pool.checkout # never checked back in
  234
+              mutex.synchronize { order << i }
  235
+            rescue => e
  236
+              mutex.synchronize { errors << e }
  237
+            end
  238
+          }
  239
+          Thread.pass until t.status == "sleep"
  240
+          t
  241
+        end
  242
+
  243
+        # this should wake up the waiting threads one by one in order
  244
+        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
  245
+
  246
+        threads.each(&:join)
  247
+
  248
+        raise errors.first if errors.any?
  249
+
  250
+        assert_equal(expected, order)
  251
+      end
  252
+
  253
+      # As mentioned in #test_checkout_fairness, we don't care about
  254
+      # strict fairness.  This test creates two groups of threads:
  255
+      # group1 whose members all start waiting before any thread in
  256
+      # group2.  Enough connections are checked in to wakeup all
  257
+      # group1 threads, and the fact that only group1 and no group2
  258
+      # threads acquired a connection is enforced.
  259
+      #
  260
+      # Try a few times since it might work out just by chance.
  261
+      def test_checkout_fairness_by_group
  262
+        4.times { setup; do_checkout_fairness_by_group }
  263
+      end
  264
+
  265
+      def do_checkout_fairness_by_group
  266
+        @pool.instance_variable_set(:@size, 10)
  267
+        # take all the connections
  268
+        conns = (1..10).map { @pool.checkout }
  269
+        mutex = Mutex.new
  270
+        successes = []    # threads that successfully got a connection
  271
+        errors = []
  272
+
  273
+        make_thread = proc do |i|
  274
+          t = Thread.new {
  275
+            begin
  276
+              conn = @pool.checkout # never checked back in
  277
+              mutex.synchronize { successes << i }
  278
+            rescue => e
  279
+              mutex.synchronize { errors << e }
  280
+            end
  281
+          }
  282
+          Thread.pass until t.status == "sleep"
  283
+          t
  284
+        end
  285
+
  286
+        # all group1 threads start waiting before any in group2
  287
+        group1 = (1..5).map(&make_thread)
  288
+        group2 = (6..10).map(&make_thread)
  289
+
  290
+        # checkin n connections back to the pool
  291
+        checkin = proc do |n|
  292
+          n.times do
  293
+            c = conns.pop
  294
+            @pool.checkin(c)
  295
+          end
  296
+        end
  297
+
  298
+        checkin.call(group1.size)         # should wake up all group1
  299
+
  300
+        loop do
  301
+          sleep 0.1
  302
+          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
  303
+        end
  304
+
  305
+        winners = mutex.synchronize { successes.dup }
  306
+        checkin.call(group2.size)         # should wake up everyone remaining
  307
+
  308
+        group1.each(&:join)
  309
+        group2.each(&:join)
  310
+
  311
+        assert_equal((1..group1.size).to_a, winners.sort)
  312
+
  313
+        if errors.any?
  314
+          raise errors.first
  315
+        end
  316
+      end
  317
+
203 318
       def test_automatic_reconnect=
204 319
         pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
205 320
         assert pool.automatic_reconnect
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.