diff --git a/ld-eventsource.gemspec b/ld-eventsource.gemspec index ed6f5c2..348c4ed 100644 --- a/ld-eventsource.gemspec +++ b/ld-eventsource.gemspec @@ -28,5 +28,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'webrick', '~> 1.7' spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0' - spec.add_runtime_dependency 'http', '>= 4.4.1', '< 6.0.0' + spec.add_runtime_dependency 'http', '>= 4.4.1', '< 7.0.0' end diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index ea5f1bf..bf6dbc8 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -52,6 +52,15 @@ class Client # The default HTTP method for requests. DEFAULT_HTTP_METHOD = "GET" + # TODO(breaking): Remove this filtering once we have updated to the next major breaking version. + # HTTP v6 requires keyword arguments instead of an options hash, so we filter to only known valid + # arguments to avoid passing unsupported options. + VALID_HTTP_CLIENT_OPTIONS = %i[ + base_uri body encoding features follow form headers json keep_alive_timeout + nodelay params persistent proxy response retriable socket_class ssl_context + ssl ssl_socket_class timeout_class timeout_options + ].freeze + # # Creates a new SSE client. # @@ -118,15 +127,15 @@ def initialize(uri, @retry_enabled = retry_enabled @headers = headers.clone - @connect_timeout = connect_timeout - @read_timeout = read_timeout + @connect_timeout = connect_timeout&.to_f + @read_timeout = read_timeout&.to_f @method = method.to_s.upcase @payload = payload @logger = logger || default_logger base_http_client_options = {} if socket_factory - base_http_client_options["socket_class"] = socket_factory + base_http_client_options[:socket_class] = socket_factory end if proxy @@ -139,22 +148,29 @@ def initialize(uri, end if @proxy - base_http_client_options["proxy"] = { + base_http_client_options[:proxy] = { :proxy_address => @proxy.host, :proxy_port => @proxy.port, } - base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.nil? - base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.nil? + base_http_client_options[:proxy][:proxy_username] = @proxy.user unless @proxy.user.nil? + base_http_client_options[:proxy][:proxy_password] = @proxy.password unless @proxy.password.nil? end options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options + options = options.transform_keys(&:to_sym) + options = options.select do |key, _| + included = VALID_HTTP_CLIENT_OPTIONS.include?(key) + @logger.warn { "Ignoring unsupported HTTP client option: #{key}" } unless included + included + end + + timeout_options = {} + timeout_options[:connect] = @connect_timeout if @connect_timeout + timeout_options[:read] = @read_timeout if @read_timeout - @http_client = HTTP::Client.new(options) + @http_client = HTTP::Client.new(**options) .follow - .timeout({ - read: read_timeout, - connect: connect_timeout, - }) + @http_client = @http_client.timeout(timeout_options) unless timeout_options.empty? @cxn = nil @lock = Mutex.new @@ -342,7 +358,7 @@ def connect begin uri = build_uri_with_query_params @logger.info { "Connecting to event stream at #{uri}" } - cxn = @http_client.request(@method, uri, build_opts) + cxn = @http_client.request(@method, uri, **build_opts) headers = cxn.headers if cxn.status.code == 200 content_type = cxn.content_type.mime_type @@ -390,8 +406,10 @@ def read_stream(cxn) rescue HTTP::TimeoutError # For historical reasons, we rethrow this as our own type raise Errors::ReadTimeoutError.new(@read_timeout) + rescue EOFError + break end - break if data.nil? + break if data.nil? # keep for v5 compat gen.yield data end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 7a8bfe4..4c502ad 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -908,6 +908,103 @@ def test_object.to_s end end + describe "http_client_options filtering" do + it "filters out unsupported options" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "", keep_open: true) + end + + client = subject.new(server.base_uri, + http_client_options: { + "socket_class" => "MySocket", + "ssl" => { verify_mode: 0 }, + "not_a_real_option" => "should be removed", + "another_fake" => 123, + }) + + http_client = client.instance_variable_get(:@http_client) + options = http_client.default_options + + expect(options.socket_class).to eq("MySocket") + expect(options.ssl).to eq({ verify_mode: 0 }) + + client.close + end + end + + it "filters out unsupported options provided as symbols" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "", keep_open: true) + end + + client = subject.new(server.base_uri, + http_client_options: { + socket_class: "MySocket", + not_a_real_option: "should be removed", + }) + + http_client = client.instance_variable_get(:@http_client) + options = http_client.default_options + + expect(options.socket_class).to eq("MySocket") + + client.close + end + end + + it "does not raise when only unsupported options are provided" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "", keep_open: true) + end + + client = nil + expect { + client = subject.new(server.base_uri, + http_client_options: { + "totally_fake" => true, + "also_fake" => "yes", + }) + }.not_to raise_error + + client.close + end + end + + it "preserves all valid options" do + with_server do |server| + server.setup_response("/") do |req,res| + send_stream_content(res, "", keep_open: true) + end + + socket_factory = double("SocketFactory") + ssl_socket_factory = double("SSLSocketFactory") + + client = subject.new(server.base_uri, + http_client_options: { + socket_class: socket_factory, + ssl_socket_class: ssl_socket_factory, + nodelay: true, + keep_alive_timeout: 30, + ssl: { verify_mode: 0 }, + }) + + http_client = client.instance_variable_get(:@http_client) + options = http_client.default_options + + expect(options.socket_class).to eq(socket_factory) + expect(options.ssl_socket_class).to eq(ssl_socket_factory) + expect(options.nodelay).to eq(true) + expect(options.keep_alive_timeout).to eq(30) + expect(options.ssl).to eq({ verify_mode: 0 }) + + client.close + end + end + end + describe "retry parameter" do it "defaults to true (retries enabled)" do events_body = simple_event_1_text