Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Client
# an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON
# request body. If payload responds to #call, it will be invoked on each
# request to generate the payload dynamically.
# @param retry_enabled [Boolean] (true) whether to retry connections after failures. If false, the client
# will exit after the first connection failure instead of attempting to reconnect.
# @yieldparam [Client] client the new client instance, before opening the connection
#
def initialize(uri,
Expand All @@ -102,9 +104,11 @@ def initialize(uri,
logger: nil,
socket_factory: nil,
method: "GET",
payload: nil)
payload: nil,
retry_enabled: true)
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)
@retry_enabled = retry_enabled

@headers = headers.clone
@connect_timeout = connect_timeout
Expand Down Expand Up @@ -256,6 +260,8 @@ def run_stream
rescue StandardError => e
log_and_dispatch_error(e, "Unexpected error while closing stream")
end

return close unless @retry_enabled
end
end

Expand Down
86 changes: 86 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -737,4 +737,90 @@ def test_object.to_s
end
end
end

describe "retry parameter" do
it "defaults to true (retries enabled)" do
events_body = simple_event_1_text
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
if attempt == 1
res.status = 500
res.body = "server error"
res.keep_alive = false
else
send_stream_content(res, events_body, keep_open: true)
end
end

event_sink = Queue.new
error_sink = Queue.new
client = subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c|
c.on_event { |event| event_sink << event }
c.on_error { |error| error_sink << error }
end

with_client(client) do |c|
expect(event_sink.pop).to eq(simple_event_1)
expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 2 # Should have retried
end
end
end

it "allows retries when retry_enabled: true" do
events_body = simple_event_1_text
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
if attempt == 1
res.status = 500
res.body = "server error"
res.keep_alive = false
else
send_stream_content(res, events_body, keep_open: true)
end
end

event_sink = Queue.new
error_sink = Queue.new
client = subject.new(server.base_uri, reconnect_time: reconnect_asap, retry_enabled: true) do |c|
c.on_event { |event| event_sink << event }
c.on_error { |error| error_sink << error }
end

with_client(client) do |c|
expect(event_sink.pop).to eq(simple_event_1)
expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 2 # Should have retried
end
end
end

it "disables retries when retry_enabled: false" do
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
res.status = 500
res.body = "server error"
res.keep_alive = false
end

error_sink = Queue.new
client = subject.new(server.base_uri, retry_enabled: false) do |c|
c.on_error { |error| error_sink << error }
end

# Give the client some time to attempt connection and fail
sleep(0.5)
client.close

expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 1 # Should not have retried
end
end
end
end