Skip to content

Commit

Permalink
halfway commit - moving computers.
Browse files Browse the repository at this point in the history
adding some nice OSX features to Gemfile.

started adding docs for worker session.
  • Loading branch information
sundbp committed Oct 8, 2011
1 parent bfc2a2c commit e13d53b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 41 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ pkg/
Gemfile.lock
vendor/cache/*.gem
._redcar/
bin/
.redcar/
.bundle/
14 changes: 12 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'rbconfig'

source :rubygems

gemspec
Expand All @@ -18,8 +20,16 @@ group :development do
gem 'fuubar', '~> 0.0.6'
gem 'guard', '~> 0.3.0'
gem 'guard-rspec', '~> 0.2.0'
gem 'rb-notifu'
gem 'rb-fchange'

if RbConfig::CONFIG['host_os'] == 'windows'
gem 'rb-notifu'
gem 'rb-fchange'
end

if RbConfig::CONFIG['host_os'] == 'darwin'
gem 'rb-fsevent'
gem 'growl'
end
end

group :test do
Expand Down
5 changes: 5 additions & 0 deletions lib/mdp.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
require 'mdp/version'
require 'mdp/exceptions'

require 'mdp/broker'
require 'mdp/worker_session'
require 'mdp/client_session'
require 'mdp/async_client_session'

# MajorDomoProtocol module
#
# Contains all the classes used to participate in a majordomo
# service oriented system.
module MDP
MDPC_CLIENT = "MDPC01"
MDPW_WORKER = "MDPW01"
Expand Down
152 changes: 113 additions & 39 deletions lib/mdp/worker_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,63 @@
require 'mdp'

module MDP

# Represents a worker session.
#
# A worker in MDP is something responding to requests for a given service.
# A worker registers with a broker and will then be forwarded client requests.
# Many workers for the same service can be running to scale out performance.
# The broker will ensure the client requests are shared among the workers in
# a sensible way.
#
# The typical flow of a worker looks something like this:
# @example
# class EchoWorker
# def initialize(broker_endpoint = 'tcp://127.0.0.1:5555')
# @session = MDP::WorkerSession.new("echo", :broker_endpoint => broker_endpoint)
# end
#
# def run
# reply = nil
# loop do
# request = @session.recv(reply)
# break if request.nil? # we got interrupted or failed
# reply = request
# end
# @session.shutdown
# end
# end
#
# On a multi-core machine there is nothing stopping us having one process
# with several threads where each thread is a worker.
class WorkerSession

# @attr_reader [String] service_name The name of the service the worker serves requests for
attr_reader :service_name

# Defaults used for the worker
#
# Note that the worker settings around heartbeats should be
# compatible with what the broker is expecting.
DEFAULTS = {
:verbose => true,
:heartbeat_interval => 2500, # in milliseconds
:heartbeat_liveness => 3, # in milliseconds
:reconnect_interval => 2500, # in milliseconds
}

# Create a worker session
#
# Note this creates a 0mq context and attempts to connect a 0mq socket to the broker.
#
# @param [String] service_name the name of the service this worker serves requests for
# @param [String] broker_endpoint where the broker to connect to lives
# @param [Hash] options a hash of options to override the DEFAULTS
# @option options [true|false] :verbose turn on verbose output via the logger
# @option options [Fixnum] :hearbeat_interval the hearbeat intervall given in milliseconds
# @option options [Fixnum] :hearbeat_liveness how many hearbeats can fail before we reconnect
# @option options [Fixnum] :reconnect_interval how long to wait before reconnecting
# @option options [#log] :logger a custom logger to be used by session
def initialize(service_name,
broker_endpoint = 'tcp://127.0.0.1:5555',
options = {})
Expand All @@ -25,6 +71,13 @@ def initialize(service_name,
connect_to_broker()
end

# Connect the worker session to the broker
#
# This will close an already open socket, and create a new socket it connects
# to the broker endpoint.
#
# Resets when the next heartbeat is to take place.
# @return self
def connect_to_broker
@worker.close unless @worker.nil?
@worker = @context.socket(ZMQ::DEALER)
Expand All @@ -36,40 +89,34 @@ def connect_to_broker
send_ready
@liveness = @options[:heartbeat_liveness]
@next_heartbeat = next_heartbeat
self
end

def send_ready
send(MDP::MDPW_READY, self.service_name)
end

def send_reply(reply)
send(MDP::MDPW_REPLY, nil, reply)
end

def send_heartbeat
send(MDP::MDPW_HEARTBEAT)
end

def send(command, option = nil, input_msg = nil)
msg = input_msg.nil? ? ZMQ::StringMultipartMessage.new : input_msg.duplicate
msg.push option unless option.nil?
msg.push command
msg.push MDP::MDPW_WORKER
msg.push ""
raise MDPError.new("Trying to send msg on socket that isn't open!") if @worker.nil?
rc = @worker.send_strings(msg)
raise MDPError.new("Failed to send msg!") unless ZMQ::Util.resultcode_ok?(rc)
rc
end

def next_heartbeat
Time.now + heartbeat_interval.to_f / 1000.0
end

def heartbeat_interval
@options[:heartbeat_interval]

# Reconnect the worker session to the broker
#
# This essentially calls (#connect_to_broker) but also handles de/re-registering
# the socket to a given poller.
#
# @param [ZMQ::Poller] poller an already existing poller used by the worker.
# @return self
def reconnect(poller)
poller.deregister_readable @worker
connect_to_broker()
poller.register_readable @worker
self
end


# Receive a client request (and deliver any existing reply)
#
# This method tends to be used in a loop like this:
#
# reply = nil
# loop do
# request = @session.recv(reply)
# break if request.nil? # we got interrupted or failed
# reply = handle_request(request)
# end
#
def recv(reply = nil)
return nil if reply.nil? and @expect_reply

Expand Down Expand Up @@ -150,8 +197,8 @@ def recv(reply = nil)
elsif results == 0
@liveness -= 1
if @liveness == 0
log "Disconnected from broker - retrying.."
sleep(@options[:reconnect_interval].to_f / 1000.0)
log "Disconnected from broker - retrying.."
reconnect(poller)
end

Expand All @@ -163,11 +210,37 @@ def recv(reply = nil)
end

end

def send_ready
send(MDP::MDPW_READY, self.service_name)
end

def reconnect(poller)
poller.deregister_readable @worker
connect_to_broker()
poller.register_readable @worker
def send_reply(reply)
send(MDP::MDPW_REPLY, nil, reply)
end

def send_heartbeat
send(MDP::MDPW_HEARTBEAT)
end

def send(command, option = nil, input_msg = nil)
msg = input_msg.nil? ? ZMQ::StringMultipartMessage.new : input_msg.duplicate
msg.push option unless option.nil?
msg.push command
msg.push MDP::MDPW_WORKER
msg.push ""
raise MDPError.new("Trying to send msg on socket that isn't open!") if @worker.nil?
rc = @worker.send_strings(msg)
raise MDPError.new("Failed to send msg!") unless ZMQ::Util.resultcode_ok?(rc)
rc
end

def next_heartbeat
Time.now + heartbeat_interval.to_f / 1000.0
end

def heartbeat_interval
@options[:heartbeat_interval]
end

def log(msg)
Expand All @@ -189,5 +262,6 @@ def shutdown
@worker.close unless @worker.nil?
@context.terminate unless @context.nil?
end
end
end

end # class WorkerSession
end # module MDP

0 comments on commit e13d53b

Please sign in to comment.