Skip to content
This repository has been archived by the owner on Sep 11, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:ruby-amqp/amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Oct 2, 2012
2 parents d642f8f + 6dc3ce1 commit 1ce3b3d
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 180 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -14,3 +14,4 @@ vendor
.yardoc/*
doc/*
tmp/*
bin/*
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -7,6 +7,7 @@ source :rubygems
def custom_gem(name, options = Hash.new)
local_path = File.expand_path("../vendor/#{name}", __FILE__)
if File.exist?(local_path)
puts "Using #{name} from #{local_path}..."
gem name, options.merge(:path => local_path).delete_if { |key, _| [:git, :branch].include?(key) }
else
gem name, options
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -5,7 +5,7 @@ This library works with Ruby 1.8.7 (*except for p249*, see the FAQ), Ruby 1.9.2,

0.8.0 and later versions of amqp gem implement [AMQP 0.9.1](http://bit.ly/amqp-model-explained) (see also [AMQP 0.9.1 spec document](http://bit.ly/amqp091spec)) and support [RabbitMQ extensions to AMQP 0.9.1](http://www.rabbitmq.com/extensions.html).

[![Continuous Integration status](https://secure.travis-ci.org/ruby-amqp/amqp.png)](http://travis-ci.org/ruby-amqp/amqp)
[![Continuous Integration status](https://secure.travis-ci.org/ruby-amqp/amqp.png?branch=master)](http://travis-ci.org/ruby-amqp/amqp)


## I know what AMQP is, how do I get started? ##
Expand Down Expand Up @@ -190,7 +190,7 @@ amqp gem is maintained by [Michael Klishin](https://github.com/michaelklishin).

* [API reference](http://rdoc.info/github/ruby-amqp/amqp/master/frames)
* [Documentation guides](http://bit.ly/amqp-gem-docs)
* [Code Examples](https://github.com/ruby-amqp/amq-protocol/tree/master/examples/)
* [Code Examples](https://github.com/ruby-amqp/amqp/tree/master/examples)
* [Issue tracker](http://github.com/ruby-amqp/amqp/issues)
* [Continous integration status](http://travis-ci.org/#!/ruby-amqp/amqp)

Expand Down
4 changes: 2 additions & 2 deletions amqp.gemspec
Expand Up @@ -24,8 +24,8 @@ Gem::Specification.new do |s|

# Dependencies
s.add_dependency "eventmachine"
s.add_dependency "amq-client", "~> 1.0.0.pre1"
s.add_dependency "amq-protocol", "~> 1.0.0.pre1"
s.add_dependency "amq-client", "~> 1.0.0.pre2"
s.add_dependency "amq-protocol", "~> 1.0.0.pre6"

begin
require "changelog"
Expand Down
8 changes: 4 additions & 4 deletions docs/GettingStarted.textile
Expand Up @@ -219,7 +219,7 @@ For the sake of simplicity, both the message producer (App I) and the consumer (
Now let us move on to a little bit more sophisticated example.


h2. Blabblr: one-to-many publish/subscribe (pubsub) example
h2. Blabbr: one-to-many publish/subscribe (pubsub) example

The previous example demonstrated how a connection to a broker is made and how to do 1:1 communication
using the default exchange. Now let us take a look at another common scenario: broadcast, or multiple consumers
Expand Down Expand Up @@ -268,7 +268,7 @@ exchange = channel.fanout("nba.scores")
</pre>

The exchange that we declare above using {AMQP::Channel#fanout} is a _fanout exchange_. A fanout exchange delivers messages to all of the queues that
are bound to it: exactly what we want in the case of Blabbr!
are bound to it: exactly what we want in the case of Blabbr.

This piece of code

Expand Down Expand Up @@ -299,11 +299,11 @@ A diagram for Blabbr looks like this:


Next we use EventMachine's {http://eventmachine.rubyforge.org/EventMachine.html#M000466 add_timer} method to
run a piece of code in 1 second from now:
run a piece of code in 2 seconds from now:

<pre>
<code>
EventMachine.add_timer(1) do
EventMachine.add_timer(2) do
exchange.delete

connection.close { EventMachine.stop }
Expand Down
81 changes: 5 additions & 76 deletions lib/amqp/bit_set.rb
@@ -1,80 +1,9 @@
# encoding: utf-8

module AMQP
# Very minimalistic, pure Ruby implementation of bit set. Inspired by java.util.BitSet,
# although significantly smaller in scope.
class BitSet

#
# API
#

ADDRESS_BITS_PER_WORD = 6
BITS_PER_WORD = (1 << ADDRESS_BITS_PER_WORD)
WORD_MASK = 0xffffffffffffffff

# @param [Integer] Number of bits in the set
# @api public
def initialize(nbits)
@nbits = nbits

self.init_words(nbits)
end # initialize(nbits)

# Sets (flags) given bit. This method allows bits to be set more than once in a row, no exception will be raised.
#
# @param [Integer] A bit to set
# @api public
def set(i)
w = self.word_index(i)
@words[w] |= (1 << i)
end # set(i)

# Fetches flag value for given bit.
#
# @param [Integer] A bit to fetch
# @return [Boolean] true if given bit is set, false otherwise
# @api public
def get(i)
w = self.word_index(i)

(@words[w] & (1 << i)) != 0
end # get(i)
alias [] get
require "amq/bit_set"

# Unsets (unflags) given bit. This method allows bits to be unset more than once in a row, no exception will be raised.
#
# @param [Integer] A bit to unset
# @api public
def unset(i)
w = self.word_index(i)
return if w.nil?

@words[w] &= ~(1 << i)
end # unset(i)

# Clears all bits in the set
# @api public
def clear
self.init_words(@nbits)
end # clear


#
# Implementation
#

protected

# @private
def init_words(nbits)
n = word_index(nbits-1) + 1
@words = Array.new(n) { 1 }
end # init_words

# @private
def word_index(i)
i >> ADDRESS_BITS_PER_WORD
end # word_index(i)
end # BitSet
module AMQP
# A forward reference for AMQP::BitSet that was extracted to amq-protocol
# to make it possible to reuse it in Bunny.
BitSet = AMQ::BitSet
end # AMQP
2 changes: 1 addition & 1 deletion lib/amqp/channel.rb
Expand Up @@ -137,7 +137,7 @@ module AMQP
#
# h2. RabbitMQ extensions.
#
# AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
# AMQP gem supports several RabbitMQ extensions that extend Channel functionality.
# Learn more in {file:docs/VendorSpecificExtensions.textile}
#
# @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
Expand Down
2 changes: 1 addition & 1 deletion lib/amqp/connection.rb
Expand Up @@ -54,7 +54,7 @@ def self.run(*args, &block)
# Pass it a block if you want a piece of code to be run once default connection
# is successfully closed.
#
# @note If default connection was never estabilished or is in the closing state already,
# @note If default connection was never established or is in the closing state already,
# this method has no effect.
# @api public
def self.stop(reply_code = 200, reply_text = "Goodbye", &block)
Expand Down
2 changes: 1 addition & 1 deletion lib/amqp/exceptions.rb
Expand Up @@ -30,7 +30,7 @@ def initialize(settings, cause = nil)
@settings = settings
@cause = cause

super("Could not estabilish TCP connection to #{@settings[:host]}:#{@settings[:port]}")
super("Could not establish TCP connection to #{@settings[:host]}:#{@settings[:port]}")
end # TCPConnectionFailed
end

Expand Down
95 changes: 4 additions & 91 deletions lib/amqp/int_allocator.rb
@@ -1,96 +1,9 @@
# encoding: utf-8

require "amqp/bit_set"
require "amq/int_allocator"

module AMQP
# Simple bitset-based integer allocator, heavily inspired by com.rabbitmq.utility.IntAllocator class
# in the RabbitMQ Java client.
#
# Unlike monotonically incrementing identifier, this allocator is suitable for very long running programs
# that aggressively allocate and release channels.
class IntAllocator

#
# API
#

# @return [Integer] Number of integers in the allocation range
attr_reader :number_of_bits
# @return [Integer] Upper boundary of the integer range available for allocation
attr_reader :hi
# @return [Integer] Lower boundary of the integer range available for allocation
attr_reader :lo

# @param [Integer] lo Lower boundary of the integer range available for allocation
# @param [Integer] hi Upper boundary of the integer range available for allocation
# @raise [ArgumentError] if upper boundary is not greater than the lower one
def initialize(lo, hi)
raise ArgumentError.new "upper boundary must be greater than the lower one (given: hi = #{hi}, lo = #{lo})" unless hi > lo

@hi = hi
@lo = lo

@number_of_bits = hi - lo
@range = Range.new(1, @number_of_bits)
@free_set = BitSet.new(@number_of_bits)
end # initialize(hi, lo)

# Attempts to allocate next available integer. If allocation succeeds, allocated value is returned.
# Otherwise, nil is returned.
#
# Current implementation of this method is O(n), where n is number of bits in the range available for
# allocation.
#
# @return [Integer] Allocated integer if allocation succeeded. nil otherwise.
def allocate
if n = find_unallocated_position
@free_set.set(n)

n
else
-1
end
end # allocate

# Releases previously allocated integer. If integer provided as argument was not previously allocated,
# this method has no effect.
#
# @return [NilClass] nil
def free(reservation)
@free_set.unset(reservation)
end # free(reservation)
alias release free

# @return [Boolean] true if provided argument was previously allocated, false otherwise
def allocated?(reservation)
@free_set.get(reservation)
end # allocated?(reservation)

# Releases the whole allocation range
def reset
@free_set.clear
end # reset



protected

# This implementation is significantly less efficient
# that what the RabbitMQ Java client has (based on java.lang.Long#nextSetBit and
# java.lang.Long.numberOfTrailingZeros, and thus binary search over bits).
# But for channel id generation, this is a good enough implementation.
#
# @private
def find_unallocated_position
r = nil
@range.each do |i|
if !@free_set.get(i)
r = i
break;
end
end

r
end # find_unallocated_position
end # IntAllocator
# A forward reference for AMQP::IntAllocator that was extracted to amq-protocol
# to make it possible to reuse it in Bunny.
IntAllocator = AMQ::IntAllocator
end # AMQP
2 changes: 1 addition & 1 deletion lib/amqp/version.rb
Expand Up @@ -6,5 +6,5 @@ module AMQP
#
# @see AMQ::Protocol::VERSION
# @return [String] AMQP gem version
VERSION = '1.0.0.pre1'
VERSION = '1.0.0.pre3'
end
3 changes: 2 additions & 1 deletion spec/unit/amqp/connection_spec.rb
Expand Up @@ -28,7 +28,8 @@
:logging => false,
:ssl => false,
:broker => nil,
:frame_max => 131072
:frame_max => 131072,
:heartbeat => 0
}
end

Expand Down

0 comments on commit 1ce3b3d

Please sign in to comment.