Permalink
Browse files

doc changes for yard

  • Loading branch information...
1 parent 62e59d5 commit 51844d70451ddf32f9fe05ee9e911121249a4a12 @derekcollison derekcollison committed Nov 21, 2010
Showing with 63 additions and 84 deletions.
  1. +0 −3 Gemfile.lock
  2. +63 −81 lib/nats/client.rb
View
@@ -34,6 +34,3 @@ DEPENDENCIES
nats!
rspec
yajl-ruby (>= 0.7.8)
-
-METADATA
- version: 1.0.6
View
@@ -1,78 +1,21 @@
-
require 'uri'
require File.dirname(__FILE__) + '/ext/em'
require File.dirname(__FILE__) + '/ext/bytesize'
require File.dirname(__FILE__) + '/ext/json'
-# NATS is a simple publish-subscribe messaging system.
-#
-# == Usage
-# <tt>
-# require "nats/client"
-#
-# NATS.start do
-#
-# # Simple Subscriber
-# NATS.subscribe('foo') { |msg| puts "Msg received : '#{msg}'" }
-#
-# # Simple Publisher
-# NATS.publish('foo.bar.baz', 'Hello World!')
-#
-# # Publish with closure, callback fires when server has processed the message
-# NATS.publish('foo', 'You done?') { puts 'msg processed!' }
-#
-# # Unsubscribing
-# s = NATS.subscribe('bar') { |msg| puts "Msg received : '#{msg}'" }
-# NATS.unsubscribe(s)
-#
-# # Request/Response
-#
-# # The helper
-# NATS.subscribe('help') do |msg, reply|
-# NATS.publish(reply, "I'll help!")
-# end
-#
-# # Help request
-# NATS.request('help') { |response|
-# puts "Got a response: '#{response}'"
-# }
-#
-# # Wildcard Subscriptions
-#
-# # '*" matches any token
-# NATS.subscribe('foo.*.baz') { |msg, _, sub| puts "Msg received on [#{sub}] : '#{msg}'" }
-#
-# # '>" can only be last token, and matches to any depth
-# NATS.subscribe('foo.>') { |msg, _, sub| puts "Msg received on [#{sub}] : '#{msg}'" }
-#
-#
-# # Stop using NATS.stop, exits EM loop if NATS.start started it
-# NATS.stop
-#
-# end
-#
-# </tt>
-
-
module NATS
- # Version <b>0.3.12</b>
VERSION = "0.3.12".freeze
- # Default port: <b>4222</b>
DEFAULT_PORT = 4222
-
- # Default URI to connect to the server, <b>nats://localhost:4222</b>
DEFAULT_URI = "nats://localhost:#{DEFAULT_PORT}".freeze
- # Max attempts at a reconnect: <b>10</b>
MAX_RECONNECT_ATTEMPTS = 10
-
- # Maximum time to wait for a reconnect: <b>2 seconds</b>
RECONNECT_TIME_WAIT = 2
# Protocol
+ # @private
MSG = /^MSG\s+(\S+)\s+(\S+)\s+((\S+)\s+)?(\d+)$/i #:nodoc:
OK = /^\+OK/i #:nodoc:
ERR = /^-ERR\s+('.+')?/i #:nodoc:
@@ -106,19 +49,28 @@ class << self
# Create and return a connection to the server with the given options. The server will be autostarted if needed if
# the <b>uri</b> is determined to be local. The optional block will be called when the connection has been completed.
#
- def connect(options = {}, &blk)
- options[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
- options[:debug] ||= ENV['NATS_DEBUG']
- options[:autostart] = (ENV['NATS_AUTO'] || true) unless options[:autostart] != nil
- uri = options[:uri] = URI.parse(options[:uri])
+ # @param [Hash] opts
+ # @option opts [String] :uri The URI to connect to, example nats://localhost:4222
+ # @option opts [Boolean] :autostart Boolean that can be used to suppress autostart functionality.
+ # @option opts [Boolean] :debug Boolean that can be used to output additional debug information.
+ # @param [Block] &blk called when the connection is completed. Connection will be passed as an arg to the block.
+ # @return [NATS] connection to the server.
+
+ def connect(opts = {}, &blk)
+ opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
+ opts[:debug] ||= ENV['NATS_DEBUG']
+ opts[:autostart] = (ENV['NATS_AUTO'] || true) unless opts[:autostart] != nil
+ uri = opts[:uri] = URI.parse(opts[:uri])
@err_cb = proc { raise Error, "Could not connect to server on #{uri}."} unless err_cb
- check_autostart(uri) if options[:autostart]
- client = EM.connect(uri.host, uri.port, self, options)
+ check_autostart(uri) if opts[:autostart]
+ client = EM.connect(uri.host, uri.port, self, opts)
client.on_connect(&blk) if blk
return client
end
- # Create a default client connection to the server. See connect for more information.
+ # Create a default client connection to the server.
+ # @see NATS::connect
+
def start(*args, &blk)
@reactor_was_running = EM.reactor_running?
unless (@reactor_was_running || blk)
@@ -128,56 +80,72 @@ def start(*args, &blk)
end
# Close the default client connection and optionally call the associated block.
+ # @param [Block] &blk called when the connection is closed.
+
def stop(&blk)
client.close if (client and client.connected?)
blk.call if blk
end
# Set the default on_error callback.
+ # @param [Block] &callback called when an error has been detected.
+
def on_error(&callback)
@err_cb, @err_cb_overridden = callback, true
end
- # Publish a message using the default client connection. See NATS#publish for more information.
+ # Publish a message using the default client connection.
+ # @see NATS#publish
+
def publish(*args, &blk)
(@client ||= connect).publish(*args, &blk)
end
- # Subscribe using the default client connection. See NATS#subscribe for more information.
+ # Subscribe using the default client connection.
+ # @see NATS#subscribe
+
def subscribe(*args, &blk)
(@client ||= connect).subscribe(*args, &blk)
end
# Cancel a subscription on the default client connection.
+ # @see NATS#unsubscribe
+
def unsubscribe(*args)
(@client ||= connect).unsubscribe(*args)
end
- # Publish a message and wait for a response on the default client connection. See NATS#request for more information.
+ # Publish a message and wait for a response on the default client connection.
+ # @see NATS#request
+
def request(*args, &blk)
(@client ||= connect).request(*args, &blk)
end
- # Returns a subject that can be used for "directed" communications, utilized in #request.
+ # Returns a subject that can be used for "directed" communications.
+ # @return [String]
+
def create_inbox
v = [rand(0x0010000),rand(0x0010000),rand(0x0010000),
rand(0x0010000),rand(0x0010000),rand(0x1000000)]
"_INBOX.%04x%04x%04x%04x%04x%06x" % v
end
- def check_autostart(uri) #:nodoc:
+ private
+
+ def check_autostart(uri)
return if uri_is_remote?(uri) || @@tried_autostart[uri]
@@tried_autostart[uri] = true
return if server_running?(uri)
return unless try_autostart_succeeded?(uri)
wait_for_server(uri)
end
- def uri_is_remote?(uri) #:nodoc:
+ def uri_is_remote?(uri)
uri.host != 'localhost' && uri.host != '127.0.0.1'
end
- def try_autostart_succeeded?(uri) #:nodoc:
+ def try_autostart_succeeded?(uri)
port_arg = "-p #{uri.port}"
user_arg = "--user #{uri.user}" if uri.user
pass_arg = "--pass #{uri.password}" if uri.password
@@ -188,15 +156,15 @@ def try_autostart_succeeded?(uri) #:nodoc:
$? == 0
end
- def wait_for_server(uri) #:nodoc:
+ def wait_for_server(uri)
start = Time.now
while (Time.now - start < 5) # Wait 5 seconds max
break if server_running?(uri)
sleep(0.1)
end
end
- def server_running?(uri) #:nodoc:
+ def server_running?(uri)
require 'socket'
s = TCPSocket.new(uri.host, uri.port)
s.close
@@ -224,16 +192,23 @@ def initialize(options)
end
# Publish a message to a given subject, with optional reply subject and completion block
- def publish(subject, data=EMPTY_MSG, opt_reply=nil, &blk)
+ # @param [String] subject
+ # @param [Object, #to_s] msg
+ # @param [String] opt_reply
+ # @param [Block] blk, closure called when publish has been processed by the server.
+ def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
return unless subject
- data = data.to_s
- send_command("PUB #{subject} #{opt_reply} #{data.bytesize}#{CR_LF}#{data}#{CR_LF}")
+ msg = msg.to_s
+ send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")
queue_server_rt(&blk) if blk
end
# Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback.
# Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub.
- # Returns subscription id which can be passed to NATS#unsubscribe.
+ # Returns subscription id which can be passed to #unsubscribe.
+ # @param [String] subject, optionally with wilcards.
+ # @param [Block] callback, called when a message is delivered.
+ # @return [Object] sid, Subject Identifier
def subscribe(subject, &callback)
return unless subject
@ssid += 1
@@ -243,13 +218,17 @@ def subscribe(subject, &callback)
end
# Cancel a subscription.
+ # @param [Object] sid
def unsubscribe(sid)
@subs.delete(sid)
send_command("UNSUB #{sid}#{CR_LF}")
end
# Send a request and have the response delivered to the supplied callback.
- # Returns subscription id which can be passed to NATS#unsubscribe.
+ # @param [String] subject
+ # @param [Object] msg
+ # @param [Block] callback
+ # @return [Object] sid
def request(subject, data=nil, &cb)
return unless subject
inbox = NATS.create_inbox
@@ -265,16 +244,19 @@ def request(subject, data=nil, &cb)
end
# Define a callback to be called when the client connection has been established.
+ # @param [Block] callback
def on_connect(&callback)
@connect_cb = callback
end
# Define a callback to be called when errors occur on the client connection.
+ # @param [Block] &blk called when the connection is closed.
def on_error(&callback)
@err_cb, @err_cb_overridden = callback, true
end
# Define a callback to be called when a reconnect attempt is being made.
+ # @param [Block] &blk called when the connection is closed.
def on_reconnect(&callback)
@reconnect_cb = callback
end
@@ -285,7 +267,7 @@ def close
close_connection_after_writing
end
- def user_err_cb? #:nodoc:
+ def user_err_cb? # :nodoc:
err_cb_overridden || NATS.err_cb_overridden
end

0 comments on commit 51844d7

Please sign in to comment.