Skip to content

Commit

Permalink
Automatically close sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
jnicklas committed Feb 4, 2014
1 parent e785332 commit b1363da
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
36 changes: 32 additions & 4 deletions lib/celluloid/jeromq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'celluloid/jeromq/sockets'
require 'celluloid/jeromq/version'
require 'celluloid/jeromq/waker'
require 'set'

module Celluloid
# Actors which run alongside 0MQ sockets
Expand All @@ -13,6 +14,8 @@ module JeroMQ

UninitializedError = Class.new StandardError

@mutex = Mutex.new

class << self
attr_writer :context

Expand All @@ -24,18 +27,43 @@ def included(klass)

# Obtain a 0MQ context
def init(worker_threads = 1)
return @context if @context
@context = ZMQ.context(worker_threads)
@mutex.synchronize do
unless @context
@context = ZMQ.context(worker_threads)
@sockets = Set.new
end
@context
end
end

def context
raise UninitializedError, "you must initialize Celluloid::JeroMQ by calling Celluloid::JeroMQ.init" unless @context
@context
end

def open_socket(type)
@mutex.synchronize do
raise UninitializedError, "cannot create socket" unless @sockets
socket = Celluloid::JeroMQ.context.socket ZMQ.const_get(type.to_s.upcase)
@sockets.add(socket)
socket
end
end

def close_socket(socket)
@mutex.synchronize do
socket.close
@sockets.delete(socket) if @sockets
end
end

def terminate
@context.term if @context
@context = nil
@mutex.synchronize do
@sockets.each(&:close) if @sockets
@context.term if @context
@sockets = nil
@context = nil
end
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/celluloid/jeromq/sockets.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module JeroMQ
class Socket
# Create a new socket
def initialize(type)
@socket = Celluloid::JeroMQ.context.socket ZMQ.const_get(type.to_s.upcase)
@socket = JeroMQ.open_socket(type)
@linger = 0
end
attr_reader :linger
Expand Down Expand Up @@ -36,7 +36,7 @@ def bind(addr)

# Close the socket
def close
@socket.close
JeroMQ.close_socket(@socket)
end

alias_method :inspect, :to_s
Expand Down
8 changes: 4 additions & 4 deletions lib/celluloid/jeromq/waker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class Waker
PAYLOAD = "\0" # the payload doesn't matter, it's just a signal

def initialize
@sender = JeroMQ.context.socket(ZMQ::PAIR)
@receiver = JeroMQ.context.socket(ZMQ::PAIR)
@sender = JeroMQ.open_socket(:pair)
@receiver = JeroMQ.open_socket(:pair)

@addr = "inproc://waker-#{object_id}"
@sender.bind @addr
Expand Down Expand Up @@ -40,8 +40,8 @@ def wait

# Clean up the IO objects associated with this waker
def cleanup
@sender_lock.synchronize { @sender.close rescue nil }
@receiver.close rescue nil
@sender_lock.synchronize { JeroMQ.close_socket(@sender) rescue nil }
JeroMQ.close_socket(@receiver) rescue nil
nil
end
alias_method :shutdown, :cleanup
Expand Down
12 changes: 12 additions & 0 deletions spec/celluloid/jeromq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ def bind(socket, index=0)
Celluloid::JeroMQ.init
Celluloid::JeroMQ.context.should_not be_nil
end

it "terminates even when actor does not shut down socket" do
actor = Class.new do
include Celluloid::JeroMQ
def initialize(port)
@socket = Celluloid::JeroMQ::RepSocket.new
@socket.connect("tcp://127.0.0.1:#{port}")
end
end
actor.new(ports[0]).terminate
Celluloid::JeroMQ.terminate
end
end

describe Celluloid::JeroMQ::RepSocket do
Expand Down

0 comments on commit b1363da

Please sign in to comment.