diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 1cd7b0a4..302c8d7d 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -30,5 +30,5 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "net-http-persistent", "~> 2.9" spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.0" spec.add_runtime_dependency "hashdiff", "~> 0.2" - spec.add_runtime_dependency "ld-em-eventsource", "~> 0.2" #https://github.com/launchdarkly/em-eventsource + spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.4" end diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 4e508990..7db8e555 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -1,6 +1,6 @@ require "concurrent/atomics" require "json" -require "ld-em-eventsource" +require "celluloid/eventsource" module LaunchDarkly PUT = "put" @@ -94,54 +94,34 @@ def get_feature(key) @store.get(key) end - def start_reactor - if defined?(Thin) - @config.logger.debug("Running in a Thin environment-- not starting EventMachine") - elsif EM.reactor_running? - @config.logger.debug("EventMachine already running") - else - @config.logger.debug("Starting EventMachine") - Thread.new { EM.run {} } - Thread.pass until EM.reactor_running? - end - EM.reactor_running? - end - def start - # Try to start the reactor. If it's not started, we shouldn't start - # the stream processor - return if not start_reactor - - # If someone else booted the stream processor connection, just return return unless @started.make_true + + headers = + { + 'Authorization' => 'api_key ' + @api_key, + 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION + } + opts = {:headers => headers, :with_credentials => true} + @es = Celluloid::EventSource.new(@config.stream_uri + "/features", opts) do |conn| + conn.on_open do + set_connected + end - # If we're the first and only thread to set started, boot - # the stream processor connection - EM.defer do - boot_event_manager - end - end + conn.on(PUT) { |message| process_message(message, PUT) } + conn.on(PATCH) { |message| process_message(message, PATCH) } + conn.on(DELETE) { |message| process_message(message, DELETE) } - def boot_event_manager - source = EM::EventSource.new(@config.stream_uri + "/features", - {}, - "Accept" => "text/event-stream", - "Authorization" => "api_key " + @api_key, - "User-Agent" => "RubyClient/" + LaunchDarkly::VERSION) - source.on(PUT) { |message| process_message(message, PUT) } - source.on(PATCH) { |message| process_message(message, PATCH) } - source.on(DELETE) { |message| process_message(message, DELETE) } - source.error do |error| - @config.logger.info("[LDClient] Stream connection: #{error}") - set_disconnected + conn.on_error do |message| + # TODO replace this with proper logging + @config.logger.error("[LDClient] Error connecting to stream. Status code: #{message[:status_code]}") + set_disconnected + end end - source.inactivity_timeout = 0 - source.start - source end def process_message(message, method) - message = JSON.parse(message, symbolize_names: true) + message = JSON.parse(message.data, symbolize_names: true) if method == PUT @store.init(message) elsif method == PATCH @@ -168,6 +148,6 @@ def should_fallback_update end # TODO mark private methods - private :boot_event_manager, :process_message, :set_connected, :set_disconnected, :start_reactor + private :process_message, :set_connected, :set_disconnected end end diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index db5aecd6..c3c8e94c 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -1,4 +1,5 @@ require "spec_helper" +require 'ostruct' describe LaunchDarkly::InMemoryFeatureStore do subject { LaunchDarkly::InMemoryFeatureStore } @@ -33,55 +34,11 @@ subject { LaunchDarkly::StreamProcessor } let(:config) { LaunchDarkly::Config.new } let(:processor) { subject.new("api_key", config) } - describe '#start' do - it "will check if the reactor has started" do - expect(processor).to receive(:start_reactor).and_return false - expect(EM).to_not receive(:defer) - processor.start - end - it "will check if the stream processor has already started" do - expect(processor).to receive(:start_reactor).and_return true - processor.instance_variable_get(:@started).make_true - expect(EM).to_not receive(:defer) - processor.start - end - it "will boot the stream processor" do - expect(processor).to receive(:start_reactor).and_return true - expect(EM).to receive(:defer) - processor.start - end - end - - describe '#boot_event_manager' do - let(:message) { "asdf" } - before do - processor.instance_variable_get(:@config).instance_variable_set(:@stream_uri, "http://example.com/streaming") - expect_any_instance_of(EM::EventSource).to receive(:start) - source = processor.send(:boot_event_manager) - @req = source.instance_variable_get "@req" - # It seems testing EventManager is hard/impossible - end - it "will start" do - end - xit "will process put messages" do - expect(processor).to receive(:process_message).with(message, LaunchDarkly::PUT) - @req.stream_data("data: #{message}\nevent:#{LaunchDarkly::PUT}\n") - end - xit "will process patch messages" do - expect(processor).to receive(:process_message).with(message, LaunchDarkly::PATCH) - end - xit "will process delete messages" do - expect(processor).to receive(:process_message).with(message, LaunchDarkly::DELETE) - end - xit "will process errors" do - expect(processor).to receive(:set_disconnected) - end - end describe '#process_message' do - let(:put_message) { '{"key": {"value": "asdf"}}' } - let(:patch_message) { '{"path": "akey", "data": {"value": "asdf", "version": 1}}' } - let(:delete_message) { '{"path": "akey", "version": 2}' } + let(:put_message) { OpenStruct.new({data: '{"key": {"value": "asdf"}}'}) } + let(:patch_message) { OpenStruct.new({data: '{"path": "akey", "data": {"value": "asdf", "version": 1}}'}) } + let(:delete_message) { OpenStruct.new({data: '{"path": "akey", "version": 2}'}) } it "will accept PUT methods" do processor.send(:process_message, put_message, LaunchDarkly::PUT) expect(processor.instance_variable_get(:@store).get("key")).to eq(value: "asdf")