Permalink
Browse files

* Release 0.1.0.

	* lib/drb/drbfire.rb: Simplified to only use one port.

	* test/test_drbfire.rb: Removed obsolete tests related to the above
	  change.

	* lib/drb/drbfire.rb: Added id wrapping and one server per process
	  caveats to the documentation.
  • Loading branch information...
1 parent b31ae5f commit f322960b7c5f5e81dcc4ed90339f0ea58e8b2b93 @ntalbott committed Feb 26, 2004
Showing with 72 additions and 252 deletions.
  1. +12 −0 drbfire/ChangeLog
  2. +60 −88 drbfire/lib/drb/drbfire.rb
  3. +0 −164 drbfire/test/test_drbfire.rb
View
@@ -1,3 +1,15 @@
+Wed Feb 25 20:05:10 2004 Nathaniel Talbott <nathaniel@talbott.ws>
+
+ * Release 0.1.0.
+
+ * lib/drb/drbfire.rb: Simplified to only use one port.
+
+ * test/test_drbfire.rb: Removed obsolete tests related to the above
+ change.
+
+ * lib/drb/drbfire.rb: Added id wrapping and one server per process
+ caveats to the documentation.
+
Sun Jan 18 22:24:10 2004 Nathaniel Talbott <nathaniel@talbott.ws>
* Release 0.0.7.
@@ -64,6 +64,23 @@
# <tt>DRbFire::DELEGATE => DRb::DRbSSLSocket</tt>. Other
# DRb protcols may also work as delegates, but only the
# SSL protocol is tested.
+#
+#
+# == Caveats
+#
+# * DRbFire uses a 32-bit id space, meaning ids will wrap after
+# approximately ~4.2 billion connections. If that's a non-theoretical
+# problem for you, and you tell me about it, I'll figure out some
+# way to fix it. It'd be worth it just to find out that DRbFire is
+# being used in such a mind-blowing fashion.
+#
+# * You're limited to one _server_ per process at this point. You can
+# have (and handle) as many clients as you want (well, ok, so I just
+# said there's really a limit somewhere around 4.2 billion. I'm
+# trying to simplify here). Again, this is possible to deal with,
+# but not something that I've needed at this point and not something
+# I'm guessing is terribly common. Let me know if it's a problem for
+# you.
module DRbFire
@@ -85,7 +102,9 @@ module DRbFire
# Miscellaneous constants
SCHEME = "drbfire" #:nodoc:
ID_FORMAT = "N" #:nodoc:
- DEBUG = proc{|*a| p a if($DEBUG)} #:nodoc:
+ INCOMING_CONN = "1"
+ OUTGOING_CONN = "2"
+ SIGNAL_CONN = "3"
class Protocol < SimpleDelegator #nodoc:all
class ClientServer
@@ -94,8 +113,7 @@ class ClientServer
def initialize(uri, config)
@uri = uri
@config = config
- @connection = Protocol.open(Protocol.signal_uri(uri), config, nil)
- @connection.is_signal = true
+ @connection = Protocol.open(uri, config, SIGNAL_CONN)
@signal_id = @connection.read_signal_id
end
@@ -105,7 +123,8 @@ def uri
def accept
@connection.stream.read(1)
- connection = Protocol.open(@uri, @config, @signal_id)
+ connection = Protocol.open(@uri, @config, OUTGOING_CONN)
+ connection.stream.write([@signal_id].pack(ID_FORMAT))
connection
end
@@ -122,7 +141,6 @@ def initialize(connection, id)
end
def write_signal_id
- DEBUG["writing id", @id]
@connection.stream.write([@id].pack(ID_FORMAT))
end
@@ -141,24 +159,19 @@ def open
end
class << self
- def open_server(uri, config, signal=false)
- DEBUG['open_server', uri, config, signal]
+ def open_server(uri, config)
if(server?(config))
- signal_server = open_signal_server(uri, config) unless(signal)
- server = new(uri, delegate(config).open_server(uri, config))
- server.signal_socket = signal_server
- server
+ @client_servers ||= {}
+ new(uri, delegate(config).open_server(uri, config))
else
ClientServer.new(uri, config)
end
end
- def open(uri, config, id=0)
- DEBUG['open', uri, config, id]
+ def open(uri, config, type=INCOMING_CONN)
unless(server?(config))
connection = new(uri, delegate(config).open(uri, config))
- DEBUG["writing id", id] if(id)
- connection.stream.write([id].pack(ID_FORMAT)) if(id)
+ connection.stream.write(type)
connection
else
@client_servers[parse_uri(uri).last.to_i].open
@@ -169,7 +182,6 @@ def add_client_connection(id, connection)
if((c = @client_servers[id]))
c.push(connection)
else
- DEBUG['add_client_connection', 'invalid client', id]
end
end
@@ -191,13 +203,6 @@ def uri_option(uri, config)
return "#{SCHEME}://#{host}:#{port}", option
end
- def signal_uri(uri)
- parts = parse_uri(uri)
- parts[1] += 1
- signal_uri = "#{SCHEME}://%s:%d?%s" % parts
- signal_uri.sub(/\?$/, '')
- end
-
private
def server?(config)
@@ -215,94 +220,61 @@ def parse_uri(uri)
@delegate.parse_uri(uri)
end
- def uri_option(uri, config)
- @delegate.uri_option(uri, config)
- end
+ def uri_option(uri, config)
+ @delegate.uri_option(uri, config)
+ end
end
end
@delegate.delegate = self
end
@delegate
end
-
- def open_signal_server(uri, config)
- @client_servers ||= {}
- signal_server = open_server(signal_uri(uri), config, true)
- signal_server.is_signal = true
- signal_server.start_signal_server
- signal_server
- end
end
- attr_writer :signal_socket, :is_signal
attr_reader :signal_id, :uri
def initialize(uri, delegate)
super(delegate)
@uri = uri
- @signal_socket = nil
- @signal_server_thread = nil
- @is_signal = false
- end
-
- def close
- if(@signal_server_thread)
- @signal_server_thread.kill
- end
- __getobj__.close
- if(@signal_socket)
- @signal_socket.close
- @signal_socket = nil
- end
+ @id = 0
+ @id_mutex = Mutex.new
end
def accept
- if(@is_signal)
- connection = self.class.new(nil, __getobj__.accept)
- connection.is_signal = true
- connection
- else
- while(__getobj__.instance_eval{@socket})
- begin
- connection = self.class.new(nil, __getobj__.accept)
- rescue IOError
- return nil
- end
- begin
- id = connection.stream.read(4).unpack(ID_FORMAT).first
- rescue
- next
- end
- next unless(id)
- DEBUG["accepted id", id]
- return connection if(id == 0)
- self.class.add_client_connection(id, connection)
+ while(__getobj__.instance_eval{@socket})
+ begin
+ connection = self.class.new(nil, __getobj__.accept)
+ rescue IOError
+ return nil
end
- end
- end
-
- def start_signal_server
- @signal_server_thread = Thread.start(self) do |server|
- id = 0
- m = Mutex.new
- loop do
- Thread.start(server.accept) do |client|
- new_id = nil
- m.synchronize do
- new_id = (id += 1)
- end
- client_server = ClientServerProxy.new(client, new_id)
- self.class.add_client_server(new_id, client_server)
- client_server.write_signal_id
+ begin
+ type = connection.stream.read(1)
+ rescue
+ next
+ end
+ case type
+ when INCOMING_CONN
+ return connection
+ when OUTGOING_CONN
+ self.class.add_client_connection(connection.read_signal_id, connection)
+ next
+ when SIGNAL_CONN
+ new_id = nil
+ @id_mutex.synchronize do
+ new_id = (@id += 1)
end
+ client_server = ClientServerProxy.new(connection, new_id)
+ self.class.add_client_server(new_id, client_server)
+ client_server.write_signal_id
+ next
+ else
+ raise "Invalid type #{type}"
end
end
end
def read_signal_id
- id = stream.read(4).unpack(ID_FORMAT).first
- DEBUG["read_signal_id", id]
- id
+ stream.read(4).unpack(ID_FORMAT).first
end
end
end
Oops, something went wrong.

0 comments on commit f322960

Please sign in to comment.