Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ld-eventsource.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'webrick', '~> 1.7'

spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
spec.add_runtime_dependency 'http', '>= 4.4.1', '< 6.0.0'
spec.add_runtime_dependency 'http', '>= 4.4.1', '< 7.0.0'
end
44 changes: 31 additions & 13 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class Client
# The default HTTP method for requests.
DEFAULT_HTTP_METHOD = "GET"

# TODO(breaking): Remove this filtering once we have updated to the next major breaking version.
# HTTP v6 requires keyword arguments instead of an options hash, so we filter to only known valid
# arguments to avoid passing unsupported options.
VALID_HTTP_CLIENT_OPTIONS = %i[
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsonbailey here is the approach we were discussing.

The only issue I found was that v4 and v5 support cookies as a parameter, but v6 dropped that. I don't include cookies in the list of allowable options since it would hard break in v6. However, I do log whenever we drop an option. Maybe that's a sufficient middle ground?

base_uri body encoding features follow form headers json keep_alive_timeout
nodelay params persistent proxy response retriable socket_class ssl_context
ssl ssl_socket_class timeout_class timeout_options
].freeze

#
# Creates a new SSE client.
#
Expand Down Expand Up @@ -118,15 +127,15 @@ def initialize(uri,
@retry_enabled = retry_enabled

@headers = headers.clone
@connect_timeout = connect_timeout
@read_timeout = read_timeout
@connect_timeout = connect_timeout&.to_f
@read_timeout = read_timeout&.to_f
@method = method.to_s.upcase
@payload = payload
@logger = logger || default_logger

base_http_client_options = {}
if socket_factory
base_http_client_options["socket_class"] = socket_factory
base_http_client_options[:socket_class] = socket_factory
end

if proxy
Expand All @@ -139,22 +148,29 @@ def initialize(uri,
end

if @proxy
base_http_client_options["proxy"] = {
base_http_client_options[:proxy] = {
:proxy_address => @proxy.host,
:proxy_port => @proxy.port,
}
base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.nil?
base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.nil?
base_http_client_options[:proxy][:proxy_username] = @proxy.user unless @proxy.user.nil?
base_http_client_options[:proxy][:proxy_password] = @proxy.password unless @proxy.password.nil?
end

options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options
options = options.transform_keys(&:to_sym)
options = options.select do |key, _|
included = VALID_HTTP_CLIENT_OPTIONS.include?(key)
@logger.warn { "Ignoring unsupported HTTP client option: #{key}" } unless included
included
end

timeout_options = {}
timeout_options[:connect] = @connect_timeout if @connect_timeout
timeout_options[:read] = @read_timeout if @read_timeout

@http_client = HTTP::Client.new(options)
@http_client = HTTP::Client.new(**options)
.follow
.timeout({
read: read_timeout,
connect: connect_timeout,
})
@http_client = @http_client.timeout(timeout_options) unless timeout_options.empty?
@cxn = nil
@lock = Mutex.new

Expand Down Expand Up @@ -342,7 +358,7 @@ def connect
begin
uri = build_uri_with_query_params
@logger.info { "Connecting to event stream at #{uri}" }
cxn = @http_client.request(@method, uri, build_opts)
cxn = @http_client.request(@method, uri, **build_opts)
headers = cxn.headers
if cxn.status.code == 200
content_type = cxn.content_type.mime_type
Expand Down Expand Up @@ -390,8 +406,10 @@ def read_stream(cxn)
rescue HTTP::TimeoutError
# For historical reasons, we rethrow this as our own type
raise Errors::ReadTimeoutError.new(@read_timeout)
rescue EOFError
break
end
break if data.nil?
break if data.nil? # keep for v5 compat
gen.yield data
end
end
Expand Down
97 changes: 97 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,103 @@ def test_object.to_s
end
end

describe "http_client_options filtering" do
it "filters out unsupported options" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = subject.new(server.base_uri,
http_client_options: {
"socket_class" => "MySocket",
"ssl" => { verify_mode: 0 },
"not_a_real_option" => "should be removed",
"another_fake" => 123,
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq("MySocket")
expect(options.ssl).to eq({ verify_mode: 0 })

client.close
end
end

it "filters out unsupported options provided as symbols" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = subject.new(server.base_uri,
http_client_options: {
socket_class: "MySocket",
not_a_real_option: "should be removed",
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq("MySocket")

client.close
end
end

it "does not raise when only unsupported options are provided" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

client = nil
expect {
client = subject.new(server.base_uri,
http_client_options: {
"totally_fake" => true,
"also_fake" => "yes",
})
}.not_to raise_error

client.close
end
end

it "preserves all valid options" do
with_server do |server|
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

socket_factory = double("SocketFactory")
ssl_socket_factory = double("SSLSocketFactory")

client = subject.new(server.base_uri,
http_client_options: {
socket_class: socket_factory,
ssl_socket_class: ssl_socket_factory,
nodelay: true,
keep_alive_timeout: 30,
ssl: { verify_mode: 0 },
})

http_client = client.instance_variable_get(:@http_client)
options = http_client.default_options

expect(options.socket_class).to eq(socket_factory)
expect(options.ssl_socket_class).to eq(ssl_socket_factory)
expect(options.nodelay).to eq(true)
expect(options.keep_alive_timeout).to eq(30)
expect(options.ssl).to eq({ verify_mode: 0 })

client.close
end
end
end

describe "retry parameter" do
it "defaults to true (retries enabled)" do
events_body = simple_event_1_text
Expand Down
Loading