Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ldclient-rb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "net-http-persistent", "~> 2.9"
spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.0"
spec.add_runtime_dependency "hashdiff", "~> 0.2"
spec.add_runtime_dependency "ld-em-eventsource", "~> 0.2" #https://github.com/launchdarkly/em-eventsource
spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.4"
end
64 changes: 22 additions & 42 deletions lib/ldclient-rb/stream.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "concurrent/atomics"
require "json"
require "ld-em-eventsource"
require "celluloid/eventsource"

module LaunchDarkly
PUT = "put"
Expand Down Expand Up @@ -94,54 +94,34 @@ def get_feature(key)
@store.get(key)
end

def start_reactor
if defined?(Thin)
@config.logger.debug("Running in a Thin environment-- not starting EventMachine")
elsif EM.reactor_running?
@config.logger.debug("EventMachine already running")
else
@config.logger.debug("Starting EventMachine")
Thread.new { EM.run {} }
Thread.pass until EM.reactor_running?
end
EM.reactor_running?
end

def start
# Try to start the reactor. If it's not started, we shouldn't start
# the stream processor
return if not start_reactor

# If someone else booted the stream processor connection, just return
return unless @started.make_true

headers =
{
'Authorization' => 'api_key ' + @api_key,
'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION
}
opts = {:headers => headers, :with_credentials => true}
@es = Celluloid::EventSource.new(@config.stream_uri + "/features", opts) do |conn|
conn.on_open do
set_connected
end

# If we're the first and only thread to set started, boot
# the stream processor connection
EM.defer do
boot_event_manager
end
end
conn.on(PUT) { |message| process_message(message, PUT) }
conn.on(PATCH) { |message| process_message(message, PATCH) }
conn.on(DELETE) { |message| process_message(message, DELETE) }

def boot_event_manager
source = EM::EventSource.new(@config.stream_uri + "/features",
{},
"Accept" => "text/event-stream",
"Authorization" => "api_key " + @api_key,
"User-Agent" => "RubyClient/" + LaunchDarkly::VERSION)
source.on(PUT) { |message| process_message(message, PUT) }
source.on(PATCH) { |message| process_message(message, PATCH) }
source.on(DELETE) { |message| process_message(message, DELETE) }
source.error do |error|
@config.logger.info("[LDClient] Stream connection: #{error}")
set_disconnected
conn.on_error do |message|
# TODO replace this with proper logging
@config.logger.error("[LDClient] Error connecting to stream. Status code: #{message[:status_code]}")
set_disconnected
end
end
source.inactivity_timeout = 0
source.start
source
end

def process_message(message, method)
message = JSON.parse(message, symbolize_names: true)
message = JSON.parse(message.data, symbolize_names: true)
if method == PUT
@store.init(message)
elsif method == PATCH
Expand All @@ -168,6 +148,6 @@ def should_fallback_update
end

# TODO mark private methods
private :boot_event_manager, :process_message, :set_connected, :set_disconnected, :start_reactor
private :process_message, :set_connected, :set_disconnected
end
end
51 changes: 4 additions & 47 deletions spec/stream_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "spec_helper"
require 'ostruct'

describe LaunchDarkly::InMemoryFeatureStore do
subject { LaunchDarkly::InMemoryFeatureStore }
Expand Down Expand Up @@ -33,55 +34,11 @@
subject { LaunchDarkly::StreamProcessor }
let(:config) { LaunchDarkly::Config.new }
let(:processor) { subject.new("api_key", config) }
describe '#start' do
it "will check if the reactor has started" do
expect(processor).to receive(:start_reactor).and_return false
expect(EM).to_not receive(:defer)
processor.start
end
it "will check if the stream processor has already started" do
expect(processor).to receive(:start_reactor).and_return true
processor.instance_variable_get(:@started).make_true
expect(EM).to_not receive(:defer)
processor.start
end
it "will boot the stream processor" do
expect(processor).to receive(:start_reactor).and_return true
expect(EM).to receive(:defer)
processor.start
end
end

describe '#boot_event_manager' do
let(:message) { "asdf" }
before do
processor.instance_variable_get(:@config).instance_variable_set(:@stream_uri, "http://example.com/streaming")
expect_any_instance_of(EM::EventSource).to receive(:start)
source = processor.send(:boot_event_manager)
@req = source.instance_variable_get "@req"
# It seems testing EventManager is hard/impossible
end
it "will start" do
end
xit "will process put messages" do
expect(processor).to receive(:process_message).with(message, LaunchDarkly::PUT)
@req.stream_data("data: #{message}\nevent:#{LaunchDarkly::PUT}\n")
end
xit "will process patch messages" do
expect(processor).to receive(:process_message).with(message, LaunchDarkly::PATCH)
end
xit "will process delete messages" do
expect(processor).to receive(:process_message).with(message, LaunchDarkly::DELETE)
end
xit "will process errors" do
expect(processor).to receive(:set_disconnected)
end
end

describe '#process_message' do
let(:put_message) { '{"key": {"value": "asdf"}}' }
let(:patch_message) { '{"path": "akey", "data": {"value": "asdf", "version": 1}}' }
let(:delete_message) { '{"path": "akey", "version": 2}' }
let(:put_message) { OpenStruct.new({data: '{"key": {"value": "asdf"}}'}) }
let(:patch_message) { OpenStruct.new({data: '{"path": "akey", "data": {"value": "asdf", "version": 1}}'}) }
let(:delete_message) { OpenStruct.new({data: '{"path": "akey", "version": 2}'}) }
it "will accept PUT methods" do
processor.send(:process_message, put_message, LaunchDarkly::PUT)
expect(processor.instance_variable_get(:@store).get("key")).to eq(value: "asdf")
Expand Down