Skip to content

Commit

Permalink
Cleanup node class / extract node_callback into seperate module
Browse files Browse the repository at this point in the history
No functional changes. Adds complete spec suite testing all
node and node_callback edge cases
  • Loading branch information
movitto committed Apr 2, 2014
1 parent 95979d8 commit 1a11733
Show file tree
Hide file tree
Showing 11 changed files with 646 additions and 138 deletions.
165 changes: 83 additions & 82 deletions lib/rjr/node.rb
@@ -1,15 +1,14 @@
# RJR Base Node Interface
#
# Copyright (C) 2012-2013 Mohammed Morsi <mo@morsi.org>
# Copyright (C) 2012-2014 Mohammed Morsi <mo@morsi.org>
# Licensed under the Apache License, Version 2.0

require 'thread'
require 'socket'
require 'rjr/common'
require 'rjr/messages'
require 'rjr/dispatcher'
require 'rjr/em_adapter'
require 'rjr/thread_pool'
require 'rjr/node_callback'

module RJR

Expand All @@ -21,7 +20,8 @@ module RJR
# Each subclass should define
# * RJR_NODE_TYPE - unique id of the transport
# * listen method - begin listening for new requests and return
# * send_message(msg, connection) - send message using the specified connection (transport dependent)
# * send_message(msg, connection) - send message using the specified connection
# (transport dependent)
# * invoke - establish connection, send message, and wait for / return result
# * notify - establish connection, send message, and immediately return
#
Expand All @@ -44,33 +44,61 @@ class Node
# Dispatcher to use to satisfy requests
attr_accessor :dispatcher

# Handlers for various connection events
attr_reader :connection_event_handlers

class <<self
# Bool indiciting if this node is persistent
def persistent?
self.const_defined?(:PERSISTENT_NODE) &&
self.const_get(:PERSISTENT_NODE)
end

# Bool indiciting if this node is indirect
def indirect?
self.const_defined?(:INDIRECT_NODE) &&
self.const_get(:INDIRECT_NODE)
end
end

# Bool indicating if this node class is persistent
def persistent?
self.class.persistent?
end

# Bool indicating if this node class is indirect
def indirect?
self.class.indirect?
end

# alias of RJR_NODE_TYPE
def node_type
self.class::RJR_NODE_TYPE
self.class.const_defined?(:RJR_NODE_TYPE) ?
self.class.const_get(:RJR_NODE_TYPE) : nil
end

# XXX used by debugging / stats interface
def self.em ; defined?(@@em) ? @@em : nil end
def self.tp ; defined?(@@tp) ? @@tp : nil end
def self.em
defined?(@@em) ? @@em : nil
end

def em
self.class.em
end

def self.tp
defined?(@@tp) ? @@tp : nil
end

def tp
self.class.tp
end

# RJR::Node initializer
#
# @param [Hash] args options to set on request
# @option args [String] :node_id unique id of the node
# @option args [Hash<String,String>] :headers optional headers to set on all json-rpc messages
# @option args [Hash<String,String>] :headers optional headers to set
# on all json-rpc messages
# @option args [Dispatcher] :dispatcher dispatcher to assign to the node
def initialize(args = {})
clear_event_handlers
Expand All @@ -86,28 +114,29 @@ def initialize(args = {})
@@em ||= EMAdapter.new

# will do nothing if already started
@@tp.start
@@em.start
tp.start
em.start
end

# Block until the eventmachine reactor and thread pool have both completed running
# Block until the eventmachine reactor and thread pool have both
# completed running.
#
# @return self
def join
@@tp.join
@@em.join
tp.join
em.join
self
end

# Immediately terminate the node
#
# *Warning* this does what it says it does. All running threads, and reactor
# jobs are immediately killed
# *Warning* this does what it says it does. All running threads,
# and reactor jobs are immediately killed
#
# @return self
def halt
@@em.stop_event_loop
@@tp.stop
em.stop_event_loop
tp.stop
self
end

Expand All @@ -119,34 +148,46 @@ def clear_event_handlers

# Register connection event handler
# @param [:error, :close] event the event to register the handler for
# @param [Callable] handler block param to be added to array of handlers that are called when event occurs
# @param [Callable] handler block param to be added to array of handlers
# that are called when event occurs
# @yield [Node] self is passed to each registered handler when event occurs
def on(event, &handler)
if @connection_event_handlers.keys.include?(event)
@connection_event_handlers[event] << handler
end
return unless @connection_event_handlers.keys.include?(event)
@connection_event_handlers[event] << handler
end

private

# Internal helper, run connection event handlers for specified event
def connection_event(event)
if @connection_event_handlers.keys.include?(event)
@connection_event_handlers[event].each { |h|
h.call self
}
end
return unless @connection_event_handlers.keys.include?(event)
@connection_event_handlers[event].each { |h|
h.call self
}
end

##################################################################

# Internal helper, extract client info from connection
def client_for(connection)
# skip if an indirect node type or local
return nil, nil if self.indirect? || self.node_type == :local

begin
return Socket.unpack_sockaddr_in(connection.get_peername)
rescue Exception=>e
end

return nil, nil
end

# Internal helper, handle message received
def handle_message(msg, connection = {})
if Messages::Request.is_request_message?(msg)
@@tp << ThreadPoolJob.new(msg) { |m| handle_request(m, false, connection) }
tp << ThreadPoolJob.new(msg) { |m| handle_request(m, false, connection) }

elsif Messages::Notification.is_notification_message?(msg)
@@tp << ThreadPoolJob.new(msg) { |m| handle_request(m, true, connection) }
tp << ThreadPoolJob.new(msg) { |m| handle_request(m, true, connection) }

elsif Messages::Response.is_response_message?(msg)
handle_response(msg)
Expand All @@ -158,34 +199,26 @@ def handle_message(msg, connection = {})
def handle_request(data, notification=false, connection={})
# get client for the specified connection
# TODO should grap port/ip immediately on connection and use that
client_port,client_ip = nil,nil
begin
# XXX skip if an 'indirect' node type or local
unless [:amqp, :local].include?(self.class::RJR_NODE_TYPE)
client_port, client_ip =
Socket.unpack_sockaddr_in(connection.get_peername)
end
rescue Exception=>e
end
client_port,client_ip = client_for(connection)

msg = notification ?
Messages::Notification.new(:message => data,
:headers => @message_headers) :
Messages::Request.new(:message => data,
:headers => @message_headers)

result =
@dispatcher.dispatch(:rjr_method => msg.jr_method,
:rjr_method_args => msg.jr_args,
:rjr_headers => msg.headers,
:rjr_client_ip => client_ip,
:rjr_client_port => client_port,
:rjr_node => self,
:rjr_node_id => @node_id,
:rjr_node_type => self.class::RJR_NODE_TYPE,
:rjr_callback =>
NodeCallback.new(:node => self,
:connection => connection))
callback = NodeCallback.new(:node => self,
:connection => connection)

result = @dispatcher.dispatch(:rjr_method => msg.jr_method,
:rjr_method_args => msg.jr_args,
:rjr_headers => msg.headers,
:rjr_client_ip => client_ip,
:rjr_client_port => client_port,
:rjr_node => self,
:rjr_node_id => node_id,
:rjr_node_type => self.node_type,
:rjr_callback => callback)

unless notification
response = Messages::Response.new(:id => msg.msg_id,
Expand Down Expand Up @@ -244,37 +277,5 @@ def wait_for_result(message)
end
return res
end

end # class Node

# Node callback interface, used to invoke json-rpc methods
# against a remote node via node connection previously established
#
# After a node sends a json-rpc request to another, the either node may send
# additional requests to each other via the connection already established until
# it is closed on either end
class NodeCallback

# NodeCallback initializer
# @param [Hash] args the options to create the node callback with
# @option args [node] :node node used to send messages
# @option args [connection] :connection connection to be used in channel selection
def initialize(args = {})
@node = args[:node]
@connection = args[:connection]
end

def notify(callback_method, *data)
# XXX return if node type does not support
# pesistent conntections (throw err instead?)
return if @node.class::RJR_NODE_TYPE == :web

msg = Messages::Notification.new :method => callback_method,
:args => data, :headers => @node.message_headers

# TODO surround w/ begin/rescue block incase of socket errors / raise RJR::ConnectionError
@node.send_msg msg.to_s, @connection
end
end

end # module RJR
43 changes: 43 additions & 0 deletions lib/rjr/node_callback.rb
@@ -0,0 +1,43 @@
# RJR Node Callback
#
# Copyright (C) 2012-2014 Mohammed Morsi <mo@morsi.org>
# Licensed under the Apache License, Version 2.0

module RJR

# Node callback interface, used to invoke json-rpc
# methods against a remote node via node connection
# previously established
#
# After a node sends a json-rpc request to another,
# the either node may send additional requests to
# each other via the connection already established
# until it is closed on either end
class NodeCallback
attr_reader :node
attr_reader :connection

# NodeCallback initializer
# @param [Hash] args the options to create the node callback with
# @option args [node] :node node used to send messages
# @option args [connection] :connection connection to be used in
# channel selection
def initialize(args = {})
@node = args[:node]
@connection = args[:connection]
end

def notify(callback_method, *data)
# TODO throw error here ?
return unless node.persistent?

msg = Messages::Notification.new :method => callback_method,
:args => data,
:headers => @node.message_headers

# TODO surround w/ begin/rescue block,
# raise RJR::ConnectionError on socket errors
@node.send_msg msg.to_s, @connection
end
end # class NodeCallback
end # module RJR
1 change: 1 addition & 0 deletions lib/rjr/nodes/amqp.rb
Expand Up @@ -52,6 +52,7 @@ module Nodes
class AMQP < RJR::Node
RJR_NODE_TYPE = :amqp
PERSISTENT_NODE = true
INDIRECT_NODE = true

private

Expand Down
1 change: 1 addition & 0 deletions lib/rjr/nodes/local.rb
Expand Up @@ -38,6 +38,7 @@ module Nodes
class Local < RJR::Node
RJR_NODE_TYPE = :local
PERSISTENT_NODE = true
INDIRECT_NODE = false

# allows clients to override the node type for the local node
attr_accessor :node_type
Expand Down
1 change: 1 addition & 0 deletions lib/rjr/nodes/tcp.rb
Expand Up @@ -81,6 +81,7 @@ def send_msg(data)
class TCP < RJR::Node
RJR_NODE_TYPE = :tcp
PERSISTENT_NODE = true
INDIRECT_NODE = false

attr_accessor :connections

Expand Down
1 change: 1 addition & 0 deletions lib/rjr/nodes/unix.rb
Expand Up @@ -66,6 +66,7 @@ def send_msg(data)
class Unix < RJR::Node
RJR_NODE_TYPE = :unix
PERSISTENT_NODE = true
INDIRECT_NODE = false

attr_accessor :connections

Expand Down
1 change: 1 addition & 0 deletions lib/rjr/nodes/web.rb
Expand Up @@ -94,6 +94,7 @@ class Web < RJR::Node

RJR_NODE_TYPE = :web
PERSISTENT_NODE = false
INDIRECT_NODE = false

public

Expand Down
1 change: 1 addition & 0 deletions lib/rjr/nodes/ws.rb
Expand Up @@ -55,6 +55,7 @@ module Nodes
class WS < RJR::Node
RJR_NODE_TYPE = :ws
PERSISTENT_NODE = true
INDIRECT_NODE = false

private

Expand Down
2 changes: 2 additions & 0 deletions lib/rjr/thread_pool.rb
Expand Up @@ -3,6 +3,8 @@
# Copyright (C) 2010-2013 Mohammed Morsi <mo@morsi.org>
# Licensed under the Apache License, Version 2.0

require 'thread'

module RJR

# Work item to be executed in a thread launched by {ThreadPool}.
Expand Down

0 comments on commit 1a11733

Please sign in to comment.