Skip to content

Commit

Permalink
Event routing support
Browse files Browse the repository at this point in the history
Observers can now actually receive messages
  • Loading branch information
karmajunkie committed Apr 15, 2014
1 parent af84936 commit 01327c0
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 46 deletions.
2 changes: 2 additions & 0 deletions lib/replay.rb
Expand Up @@ -13,6 +13,7 @@ class ReplayError < StandardError; end
class UndefinedKeyError < ReplayError; end
class UnhandledEventError < ReplayError; end
class UnknownEventError < ReplayError; end
class InvalidRouterError < ReplayError; end
class InvalidStorageError < ReplayError;
def initialize(*args)
klass = args.shift
Expand All @@ -33,6 +34,7 @@ def initialize(*args)
require 'replay/event_declarations'
require 'replay/publisher'
require 'replay/subscription_manager'
require 'replay/subscriptions'
require 'replay/backends'
require 'replay/repository'
require 'replay/repository/identity_map'
Expand Down
29 changes: 20 additions & 9 deletions lib/replay/observer.rb
Expand Up @@ -2,16 +2,27 @@ module Replay
module Observer

def self.included(base)
class << base
def observe(event_type, &block)
@observer_blocks ||= Hash.new
@observer_blocks[Replay::Inflector.underscore(event_type.to_s)] = block
end
base.extend(ClassMethods)
base.instance_variable_set(:@router, Replay::Router::DefaultRouter)
end

module ClassMethods
def router(rtr)
raise Replay::InvalidRouterError.new("Router does not implement add_observer") unless rtr.respond_to?(:add_observer)
@router = rtr
end

def observe(event_type, &block)
raise InvalidRouterError.new("No router defined!") unless @router
@observer_blocks ||= Hash.new
@observer_blocks[Replay::Inflector.underscore(event_type.to_s)] = block

@router.add_observer self, event_type
end

def published(stream_id, event)
blk = @observer_blocks[Replay::Inflector.underscore(event.class.to_s)]
blk.call(stream_id, event, binding) if blk
end
def published(stream_id, event)
blk = @observer_blocks[Replay::Inflector.underscore(event.class.to_s)]
blk.call(stream_id, event, binding) if blk
end
end
end
Expand Down
15 changes: 3 additions & 12 deletions lib/replay/publisher.rb
@@ -1,21 +1,12 @@
module Replay
module Publisher
def self.included(base)
include_essentials base
end

def self.include_essentials(base)
base.instance_variable_set :@application_blocks, {}
base.extend ClassMethods
base.extend(Replay::Events)
end

def subscription_manager
@subscription_manager ||= Replay::SubscriptionManager.new(Replay.logger)
end

def add_subscriber(subscriber)
subscription_manager.add_subscriber(subscriber)
base.class_exec do
include Replay::Subscriptions
end
end

def apply(events, raise_unhandled = true)
Expand Down
19 changes: 19 additions & 0 deletions lib/replay/router.rb
@@ -1,5 +1,24 @@
module Replay
module Router
def self.included(base)
base.class_exec do
include Replay::Subscriptions
extend ClassMethods if base.include?(Singleton)
end
end

def add_observer(observer, *events)
add_subscriber observer
end

module ClassMethods
def add_observer(observer, *events)
instance.add_subscriber(observer)
end

def published(stream, event)
instance.published(stream, event)
end
end
end
end
13 changes: 1 addition & 12 deletions lib/replay/router/default_router.rb
Expand Up @@ -2,18 +2,7 @@ module Replay
module Router
class DefaultRouter
include Singleton

def initialize
@subscription_manager = Replay::SubscriptionManager.new
end

def add_observer(observer, *events)
@subscription_manager.add_subscriber(observer)
end

def published(stream_id, event)
@subscription_manager.notify_subscribers(stream_id, event)
end
include Replay::Router
end
end
end
Expand Down
15 changes: 15 additions & 0 deletions lib/replay/subscriptions.rb
@@ -0,0 +1,15 @@
module Replay
module Subscriptions
def subscription_manager
@subscription_manager ||= Replay::SubscriptionManager.new(Replay.logger)
end

def add_subscriber(subscriber)
subscription_manager.add_subscriber(subscriber)
end

def published(stream_id, event)
@subscription_manager.notify_subscribers(stream_id, event)
end
end
end
10 changes: 10 additions & 0 deletions lib/replay/test.rb
Expand Up @@ -37,6 +37,16 @@ def has_subscriber?(subscriber)
end
end

Replay::Router.module_exec do
def inspect
self.class.to_s
end

def observed_by?(object)
@subscription_manager.has_subscriber?(object)
end
end

Replay::Publisher::ClassMethods.module_exec do
def self.extended(base)
@publishers ||= []
Expand Down
21 changes: 21 additions & 0 deletions test/replay/observer_spec.rb
Expand Up @@ -21,6 +21,19 @@ def self.reset
end
end

class NonstandardRouter
include Singleton
include Replay::Router
end

class RoutedObserverTest
include Replay::Observer
router NonstandardRouter.instance

observe ObservedEvent do |e|
end
end

describe Replay::Observer do
before do
ObserverTest.reset
Expand All @@ -34,4 +47,12 @@ def self.reset
ObserverTest.published('123', UnobservedEvent.new)
ObserverTest.wont_be :observed?
end

it "links to DefaultRouter by default" do
Replay::Router::DefaultRouter.instance.must_be :observed_by?,ObserverTest
end

it "links to a substitute router when instructed" do
NonstandardRouter.instance.must_be :observed_by?, RoutedObserverTest
end
end
12 changes: 1 addition & 11 deletions test/replay/router/default_router_spec.rb
@@ -1,12 +1,4 @@
require 'test_helper'

module Router
module Test
def has_observer?(object)
@subscription_manager.has_subscriber?(object)
end
end
end
class TypedEvent

end
Expand All @@ -20,16 +12,14 @@ def self.typed_received?
end
end

Replay::Router::DefaultRouter.send(:include, Router::Test)

describe Replay::Router::DefaultRouter do
before do
@router = Replay::Router::DefaultRouter.instance
end
describe "adding observers" do
it "tracks the observing object" do
@router.add_observer(Observer)
assert(@router.has_observer?(Observer), "router does not track observer")
@router.must_be :observed_by?, Observer
end
end
describe "event publishing" do
Expand Down
2 changes: 0 additions & 2 deletions test/test_helper.rb
@@ -1,6 +1,4 @@
gem 'minitest'
gem 'replay'
gem 'byebug'

require 'minitest/autorun'
require 'minitest/spec'
Expand Down

0 comments on commit 01327c0

Please sign in to comment.