Skip to content

Commit

Permalink
Do better bookkeeping when tracking state in Thread.current
Browse files Browse the repository at this point in the history
This is largely just another cleanup patch, but does a couple main
things:

* Hoists the `last_response` value into thread state. This is a very
  minor nicety, but effectively makes `StripeClient` fully thread-safe,
  which seems like a minor nicety. Two calls to `#request` to the same
  `StripeObject` can now be executed on two different threads and their
  results won't interfere with each other.

* Moves state off one-off `Thread.current` keys and into a single one
  for the whole client which stores a new simple type of record called
  `ThreadContext`. Again, this doesn't change much, but adds some minor
  type safety and lets us document each field we expect to have in a
  thread's context.
  • Loading branch information
brandur committed Aug 19, 2019
1 parent 0abfc55 commit 03efb94
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 16 deletions.
80 changes: 69 additions & 11 deletions lib/stripe/stripe_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ def initialize(connection_manager = nil)
@last_request_metrics = nil
end

# For internal use: sets a value on the current thread's store when
# Gets a currently active `StripeClient`. Set for the current thread when
# `StripeClient#request` is being run so that API operations being executed
# inside of that block can find the currently active client. It's reset to
# the original value (hopefully `nil`) after the block ends.
#
# For internal use only. Does not provide a stable API and may be broken
# with future non-major changes.
def self.active_client
Thread.current[:stripe_client] || default_client
current_thread_context.active_client || default_client
end

# Finishes any active connections by closing their TCP connection and
# clears them from internal tracking in all connection managers across all
# threads.
#
# For internal use only. Does not provide a stable API and may be broken
# with future non-major changes.
def self.clear_all_connection_managers
# Just a quick path for when configuration is being set for the first
# time before any connections have been opened. There is technically some
Expand All @@ -45,13 +51,13 @@ def self.clear_all_connection_managers

# A default client for the current thread.
def self.default_client
Thread.current[:stripe_client_default_client] ||=
current_thread_context.default_client ||=
StripeClient.new(default_connection_manager)
end

# A default connection manager for the current thread.
def self.default_connection_manager
Thread.current[:stripe_client_default_connection_manager] ||= begin
current_thread_context.default_connection_manager ||= begin
connection_manager = ConnectionManager.new

@all_connection_managers_mutex.synchronize do
Expand Down Expand Up @@ -121,15 +127,18 @@ def self.sleep_time(num_retries)
# charge, resp = client.request { Charge.create }
#
def request
@last_response = nil
old_stripe_client = Thread.current[:stripe_client]
Thread.current[:stripe_client] = self
old_stripe_client = self.class.current_thread_context.active_client
self.class.current_thread_context.active_client = self

self.class.current_thread_context.last_responses ||= {}
self.class.current_thread_context.last_responses[object_id] = nil

begin
res = yield
[res, @last_response]
[res, self.class.current_thread_context.last_responses[object_id]]
ensure
Thread.current[:stripe_client] = old_stripe_client
self.class.current_thread_context.active_client = old_stripe_client
self.class.current_thread_context.last_responses.delete(object_id)
end
end

Expand Down Expand Up @@ -196,8 +205,13 @@ def execute_request(method, path,
raise general_api_error(http_resp.code.to_i, http_resp.body)
end

# Allows StripeClient#request to return a response object to a caller.
@last_response = resp
# If being called from `StripeClient#request`, put the last response in
# thread-local memory so that it can be returned to the user. Don't store
# anything otherwise so that we don't leak memory.
if self.class.current_thread_context.last_responses&.key?(object_id)
self.class.current_thread_context.last_responses[object_id] = resp
end

[resp, api_key]
end

Expand Down Expand Up @@ -248,6 +262,50 @@ def execute_request(method, path,
}.freeze
private_constant :NETWORK_ERROR_MESSAGES_MAP

# A record representing any data that `StripeClient` puts into
# `Thread.current`. Making it a class likes this gives us a little extra
# type safety and lets us document what each field does.
#
# For internal use only. Does not provide a stable API and may be broken
# with future non-major changes.
class ThreadContext
# A `StripeClient` that's been flagged as currently active within a
# thread by `StripeClient#request`. A client stays active until the
# completion of the request block.
attr_accessor :active_client

# A default `StripeClient` object for the thread. Used in all cases where
# the user hasn't specified their own.
attr_accessor :default_client

# A default `ConnectionManager` for the thread. Normally shared between
# all `StripeClient` objects on a particular thread, and created so as to
# minimize the number of open connections that an application needs.
attr_accessor :default_connection_manager

# A temporary map of object IDs to responses from last executed API
# calls. Used to return a responses from calls to `StripeClient#request`.
#
# Stored in the thread data to make the use of a single `StripeClient`
# object safe across multiple threads. Stored as a map so that multiple
# `StripeClient` objects can run concurrently on the same thread.
#
# Responses are only left in as long as they're needed, which means
# they're removed as soon as a call leaves `StripeClient#request`, and
# because that's wrapped in an `ensure` block, they should never leave
# garbage in `Thread.current`.
attr_accessor :last_responses
end

# Access data stored for `StripeClient` within the thread's current
# context. Returns `ThreadContext`.
#
# For internal use only. Does not provide a stable API and may be broken
# with future non-major changes.
def self.current_thread_context
Thread.current[:stripe_client__internal_use_only] ||= ThreadContext.new
end

private def api_url(url = "", api_base = nil)
(api_base || Stripe.api_base) + url
end
Expand Down
91 changes: 86 additions & 5 deletions test/stripe/stripe_client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -826,22 +826,103 @@ class StripeClientTest < Test::Unit::TestCase

should "reset local thread state after a call" do
begin
Thread.current[:stripe_client] = :stripe_client
StripeClient.current_thread_context.active_client = :stripe_client

client = StripeClient.new
client.request {}

assert_equal :stripe_client, Thread.current[:stripe_client]
assert_equal :stripe_client,
StripeClient.current_thread_context.active_client
ensure
Thread.current[:stripe_client] = nil
StripeClient.current_thread_context.active_client = nil
end
end

should "correctly return last responses despite multiple clients" do
charge_resp = { object: "charge" }
coupon_resp = { object: "coupon" }

stub_request(:post, "#{Stripe.api_base}/v1/charges")
.to_return(body: JSON.generate(charge_resp))
stub_request(:post, "#{Stripe.api_base}/v1/coupons")
.to_return(body: JSON.generate(coupon_resp))

client1 = StripeClient.new
client2 = StripeClient.new

client2_resp = nil
_charge, client1_resp = client1.request do
Charge.create

# This is contrived, but we run one client nested in the `request`
# block of another one just to ensure that the parent is still
# unwinding when this goes through. If the parent's last response
# were to be overridden by this client (through a bug), then it would
# happen here.
_coupon, client2_resp = client2.request do
Coupon.create
end
end

assert_equal charge_resp, client1_resp.data
assert_equal coupon_resp, client2_resp.data
end

should "correctly return last responses despite multiple threads" do
charge_resp = { object: "charge" }
coupon_resp = { object: "coupon" }

stub_request(:post, "#{Stripe.api_base}/v1/charges")
.to_return(body: JSON.generate(charge_resp))
stub_request(:post, "#{Stripe.api_base}/v1/coupons")
.to_return(body: JSON.generate(coupon_resp))

client = StripeClient.new

# Poorly named class -- note this is actually a concurrent queue.
recv_queue = Queue.new
send_queue = Queue.new

# Start a thread, make an API request, but then idle in the `request`
# block until the main thread has been able to make its own API request
# and signal that it's done. If this thread's last response were to be
# overridden by the main thread (through a bug), then this routine
# should suss it out.
resp1 = nil
thread = Thread.start do
_charge, resp1 = client.request do
Charge.create

# Idle in `request` block until main thread signals.
send_queue.pop
end

# Signal main thread that we're done and it can run its checks.
recv_queue << true
end

# Make an API request.
_coupon, resp2 = client.request do
Coupon.create
end

# Tell background thread to finish `request`, then wait for it to
# signal back to us that it's ready.
send_queue << true
recv_queue.pop

assert_equal charge_resp, resp1.data
assert_equal coupon_resp, resp2.data

# And for maximum hygiene, make sure that our thread rejoins.
thread.join
end
end

context "#proxy" do
should "run the request through the proxy" do
begin
Thread.current[:stripe_client_default_connection_manager] = nil
StripeClient.current_thread_context.default_connection_manager = nil

Stripe.proxy = "http://user:pass@localhost:8080"

Expand All @@ -857,7 +938,7 @@ class StripeClientTest < Test::Unit::TestCase
ensure
Stripe.proxy = nil

Thread.current[:stripe_client_default_connection_manager] = nil
StripeClient.current_thread_context.default_connection_manager = nil
end
end
end
Expand Down

0 comments on commit 03efb94

Please sign in to comment.