Skip to content

Commit

Permalink
dcell: attempt to use msgpack for RPC v1
Browse files Browse the repository at this point in the history
  • Loading branch information
niamster committed Dec 14, 2014
1 parent 38fd929 commit 72c66b2
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 412 deletions.
1 change: 1 addition & 0 deletions dcell.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "celluloid-zmq", "~> 0.16.0"
gem.add_runtime_dependency "reel", "~> 0.4.0"
gem.add_runtime_dependency "http", "~> 0.5.0"
gem.add_runtime_dependency "msgpack"
gem.add_runtime_dependency "celluloid-redis"
gem.add_runtime_dependency "redis-namespace"

Expand Down
4 changes: 1 addition & 3 deletions lib/dcell.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
require 'celluloid/zmq'
require 'socket'
require 'securerandom'
require 'msgpack'

Celluloid::ZMQ.init

require 'dcell/version'
require 'dcell/actor_proxy'
require 'dcell/directory'
require 'dcell/mailbox_proxy'
require 'dcell/messages'
require 'dcell/node'
require 'dcell/node_manager'
require 'dcell/global'
require 'dcell/responses'
require 'dcell/router'
require 'dcell/rpc'
require 'dcell/future_proxy'
require 'dcell/server'
require 'dcell/info_service'

Expand Down
34 changes: 11 additions & 23 deletions lib/dcell/actor_proxy.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
module DCell
# Proxy object for actors that live on remote nodes
class CellProxy < Celluloid::CellProxy; end

class ThreadHandleProxy
def kill
raise NotImplementedError, "remote kill not supported"
end

def join(timeout)
raise NotImplementedError, "remote join not supported"
class ActorProxy
def initialize(dnode, rmailbox)
@dnode, @rmailbox = dnode, rmailbox
end
end

class SubjectProxy
def class
"[remote]"
end
end

class Actor
def initialize(mailbox)
@mailbox = mailbox
@thread = ThreadHandleProxy.new
@subject = SubjectProxy.new
@proxy = Celluloid::ActorProxy.new(@thread, @mailbox)
def method_missing(meth, *args, &block)
message = {:mailbox => @rmailbox, :meth => meth, :args => args, :block => block_given?}
res = @dnode.send_request Message::Relay.new(Thread.mailbox, message)
if block_given?
yield res
else
res
end
end
attr_reader :mailbox, :thread, :subject, :proxy
end
end
109 changes: 5 additions & 104 deletions lib/dcell/celluloid_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,112 +9,13 @@
# DCell overlay network back to the node where the actor actually exists

module Celluloid
class CellProxy
# Marshal uses respond_to? to determine if this object supports _dump so
# unfortunately we have to monkeypatch in _dump support as the proxy
# itself normally jacks respond_to? and proxies to the actor
alias_method :__respond_to?, :respond_to?
def respond_to?(meth, check_private = false)
return false if meth == :marshal_dump
return true if meth == :_dump
__respond_to?(meth, check_private)
end

# Dump an actor proxy via its mailbox
def _dump(level)
@mailbox._dump(level)
end

# Create an actor proxy object which routes messages over DCell's overlay
# network and back to the original mailbox
def self._load(string)
mailbox = ::Celluloid::Mailbox._load(string)

case mailbox
when ::DCell::MailboxProxy
actor = ::DCell::Actor.new(mailbox)
::DCell::CellProxy.new actor.proxy, mailbox, actor.subject.class.to_s
when ::Celluloid::Mailbox
actor = find_actor(mailbox)
::Celluloid::CellProxy.new actor.proxy, mailbox, actor.behavior.subject.class.to_s
else
::Kernel.raise "funny, I did not expect to see a #{mailbox.class} here"
end
end

def self.find_actor(mailbox)
::Thread.list.each do |t|
if actor = t[:celluloid_actor]
return actor if actor.mailbox == mailbox
end
end
::Kernel.raise "no actor found for mailbox: #{mailbox.inspect}"
end
end

class Mailbox
def address
"#{@address}@#{DCell.id}"
end

# This custom dumper registers actors with the DCell registry so they can
# be reached remotely.
def _dump(level)
def to_msgpack(pk=nil)
DCell::Router.register self
address
end

# Create a mailbox proxy object which routes messages over DCell's overlay
# network and back to the original mailbox
def self._load(string)
DCell::MailboxProxy._load(string)
end
end

class SyncCall
def _dump(level)
uuid = DCell::RPC::Manager.register self
payload = Marshal.dump([@sender,@method,@arguments,@block])
"#{uuid}@#{DCell.id}:rpc:#{payload}"
end

def self._load(string)
DCell::RPC._load(string)
end
end

class BlockProxy
def _dump(level)
uuid = DCell::RPC::Manager.register self
payload = Marshal.dump([@mailbox,@execution,@arguments])
"#{uuid}@#{DCell.id}:rpb:#{payload}"
end

def self._load(string)
DCell::RPC._load(string)
end
end

class BlockCall
def _dump(level)
uuid = DCell::RPC::Manager.register self
payload = Marshal.dump([@block_proxy,@sender,@arguments])
"#{uuid}@#{DCell.id}:rpbc:#{payload}"
end

def self._load(string)
DCell::RPC._load(string)
end
end

class Future
def _dump(level)
mailbox_id = DCell::Router.register self
"#{mailbox_id}@#{DCell.id}"
end

def self._load(string)
DCell::FutureProxy._load(string)
{
:address => @address,
:id => DCell.id
}.to_msgpack(pk)
end
end
end
32 changes: 0 additions & 32 deletions lib/dcell/future_proxy.rb

This file was deleted.

58 changes: 0 additions & 58 deletions lib/dcell/mailbox_proxy.rb

This file was deleted.

73 changes: 66 additions & 7 deletions lib/dcell/messages.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module DCell
class Message
attr_reader :id
attr_accessor :id

def initialize
# Memoize the original object ID so it will get marshalled
Expand All @@ -19,6 +19,14 @@ def dispatch
node = DCell::Node[@id]
node.handle_heartbeat @from if node
end

def to_msgpack(pk=nil)
{
:type => self.class.name,
:id => @id,
:args => [@from]
}.to_msgpack(pk)
end
end

# Query a node for the address of an actor
Expand All @@ -31,7 +39,21 @@ def initialize(sender, name)
end

def dispatch
@sender << SuccessResponse.new(@id, Celluloid::Actor[@name])
actor = Celluloid::Actor[@name]
if actor
mailbox = actor.mailbox
else
mailbox = nil
end
Node[@sender[:id]] << SuccessResponse.new(@id, @sender[:address], mailbox)
end

def to_msgpack(pk=nil)
{
:type => self.class.name,
:id => @id,
:args => [@sender, @name]
}.to_msgpack(pk)
end
end

Expand All @@ -45,21 +67,58 @@ def initialize(sender)
end

def dispatch
@sender << SuccessResponse.new(@id, Celluloid::Actor.registered)
Node[@sender[:id]] << SuccessResponse.new(@id, @sender[:address], Celluloid::Actor.registered)
end

def to_msgpack(pk=nil)
{
:type => self.class.name,
:id => @id,
:args => [@sender]
}.to_msgpack(pk)
end
end

# Relay a message to the given recipient
class Relay < Message
attr_reader :recipient, :message
attr_reader :sender, :message

def initialize(recipient, message)
def initialize(sender, message)
super()
@recipient, @message = recipient, message
@sender, @message = sender, message
end

def find_actor(mailbox)
::Thread.list.each do |t|
if actor = t[:celluloid_actor]
return actor if actor.mailbox.address == mailbox[:address]
end
end
nil
end

def dispatch
@recipient << @message
actor = find_actor(message[:mailbox])
if actor
value = nil
if message[:block]
Celluloid::Actor::call actor.mailbox, message[:meth], *message[:args] {|v| value = v}
else
value = Celluloid::Actor::call actor.mailbox, message[:meth], *message[:args]
end
rsp = SuccessResponse.new(@id, @sender[:address], value)
else
rsp = ErrorResponse.new(@id, @sender[:address], {:class => Celluloid::DeadActorError.name, :msg => nil})
end
Node[@sender[:id]].async.send_message rsp
end

def to_msgpack(pk=nil)
{
:type => self.class.name,
:id => @id,
:args => [@sender, @message]
}.to_msgpack(pk)
end
end
end
Expand Down
Loading

0 comments on commit 72c66b2

Please sign in to comment.