-
-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into fix-error-message-in-delivery-worker
- Loading branch information
Showing
15 changed files
with
550 additions
and
75 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,3 +148,4 @@ group :production do | |
end | ||
|
||
gem 'concurrent-ruby', require: false | ||
gem 'connection_pool', require: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'connection_pool' | ||
require_relative './shared_timed_stack' | ||
|
||
class ConnectionPool::SharedConnectionPool < ConnectionPool | ||
def initialize(options = {}, &block) | ||
super(options, &block) | ||
|
||
@available = ConnectionPool::SharedTimedStack.new(@size, &block) | ||
end | ||
|
||
delegate :size, :flush, to: :@available | ||
|
||
def with(preferred_tag, options = {}) | ||
Thread.handle_interrupt(Exception => :never) do | ||
conn = checkout(preferred_tag, options) | ||
|
||
begin | ||
Thread.handle_interrupt(Exception => :immediate) do | ||
yield conn | ||
end | ||
ensure | ||
checkin(preferred_tag) | ||
end | ||
end | ||
end | ||
|
||
def checkout(preferred_tag, options = {}) | ||
if ::Thread.current[key(preferred_tag)] | ||
::Thread.current[key_count(preferred_tag)] += 1 | ||
::Thread.current[key(preferred_tag)] | ||
else | ||
::Thread.current[key_count(preferred_tag)] = 1 | ||
::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout) | ||
end | ||
end | ||
|
||
def checkin(preferred_tag) | ||
if ::Thread.current[key(preferred_tag)] | ||
if ::Thread.current[key_count(preferred_tag)] == 1 | ||
@available.push(::Thread.current[key(preferred_tag)]) | ||
::Thread.current[key(preferred_tag)] = nil | ||
else | ||
::Thread.current[key_count(preferred_tag)] -= 1 | ||
end | ||
else | ||
raise ConnectionPool::Error, 'no connections are checked out' | ||
end | ||
|
||
nil | ||
end | ||
|
||
private | ||
|
||
def key(tag) | ||
:"#{@key}-#{tag}" | ||
end | ||
|
||
def key_count(tag) | ||
:"#{@key_count}-#{tag}" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# frozen_string_literal: true | ||
|
||
class ConnectionPool::SharedTimedStack | ||
def initialize(max = 0, &block) | ||
@create_block = block | ||
@max = max | ||
@created = 0 | ||
@queue = [] | ||
@tagged_queue = Hash.new { |hash, key| hash[key] = [] } | ||
@mutex = Mutex.new | ||
@resource = ConditionVariable.new | ||
end | ||
|
||
def push(connection) | ||
@mutex.synchronize do | ||
store_connection(connection) | ||
@resource.broadcast | ||
end | ||
end | ||
|
||
alias << push | ||
|
||
def pop(preferred_tag, timeout = 5.0) | ||
deadline = current_time + timeout | ||
|
||
@mutex.synchronize do | ||
loop do | ||
return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty? | ||
|
||
connection = try_create(preferred_tag) | ||
return connection if connection | ||
|
||
to_wait = deadline - current_time | ||
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 | ||
|
||
@resource.wait(@mutex, to_wait) | ||
end | ||
end | ||
end | ||
|
||
def empty? | ||
size.zero? | ||
end | ||
|
||
def size | ||
@mutex.synchronize do | ||
@queue.size | ||
end | ||
end | ||
|
||
def flush | ||
@mutex.synchronize do | ||
@queue.delete_if do |connection| | ||
delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME) | ||
|
||
if delete | ||
@tagged_queue[connection.site].delete(connection) | ||
connection.close | ||
@created -= 1 | ||
end | ||
|
||
delete | ||
end | ||
end | ||
end | ||
|
||
private | ||
|
||
def try_create(preferred_tag) | ||
if @created == @max && !@queue.empty? | ||
throw_away_connection = @queue.pop | ||
@tagged_queue[throw_away_connection.site].delete(throw_away_connection) | ||
@create_block.call(preferred_tag) | ||
elsif @created != @max | ||
connection = @create_block.call(preferred_tag) | ||
@created += 1 | ||
connection | ||
end | ||
end | ||
|
||
def fetch_preferred_connection(preferred_tag) | ||
connection = @tagged_queue[preferred_tag].pop | ||
@queue.delete(connection) | ||
connection | ||
end | ||
|
||
def current_time | ||
Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
end | ||
|
||
def store_connection(connection) | ||
@tagged_queue[connection.site].push(connection) | ||
@queue.push(connection) | ||
end | ||
end |
Oops, something went wrong.