Fleck is a Ruby gem for comunication over RabbitMQ. It implements both Fleck::Consumer
for messages consumption from RabbitMQ queues and
Fleck::Client
for making RPC (Remote Procedure Call) and asynchronous calls.
Add this line to your application's Gemfile:
gem 'fleck'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fleck
Before using Fleck you might want to configure it. For doing that you could use as example the code below:
require 'fleck'
# configure defaults for fleck
Fleck.configure do |config|
config.loglevel = Logger::INFO # log level
config.logfile = STDOUT # the file where to write the logs
config.progname = 'MyApp' # the progname prefix to use in logs
config.default_host = '127.0.0.1' # default host to use for connections to RabbitMQ
config.default_port = 5672 # default port to use for connections to RabbitMQ
config.default_user = 'guest' # default user to use for connections to RabbitMQ
config.default_pass = 'guest' # default password to use for connections to RabbitMQ
config.default_vhost = '/' # default vhost to use for connections to RabbitMQ
config.default_queue = 'default' # default queue name to use in consumers, when not specified
end
You could use Fleck for both making requests and consuming requests from the queues. Now we are going to see how to enqueue a request to a specific queue:
ACTION = 'do_something' # the action to be executed by the consumer
QUEUE = 'my.queue' # the name of the queue where to enqueue the request
HEADERS = {my_header: 'a header'} # the headers of the request
PARAMS = {parameter: 'a parameter'} # the parameters of the request
ASYNC = false # a flag to indicate if the request is async or not
connection = Fleck.connection(host: '127.0.0.1', port: 5672, user: 'guest', pass: 'guest', vhost: '/')
client = Fleck::Client.new(connection, QUEUE)
response = client.request(action: ACTION, headers: HEADERS, params: PARAMS, async: ASYNC)
response.status # => returns the status code of the response
response.headers # => returns the headers Hash of the response
response.body # => returns the body of the response
response.errors # => returns the Array of errors
All the options of the requests are optional. The available options for request are:
action
- (default: nil) - used to indicate the action to be executed by the consumerheaders
- (default:{}
) - allows to set headers for the requestparams
- (default:{}
) - allows to set the parameters of the requestasync
- (default:false
) - indicates if the request should be executed asynchronouslytimeout
- (default:nil
) - when set, indicates the request timeout in seconds after which the request will be canceledqueue
- (default:<client queue>
) - allows to specify a different queue where to enqueue the request
You might want to process the response of asynchronous requests when the response is ready. In that case you could pass a block to the request, so that the block is called when the response is completed:
client.request(action: 'do_something', headers: {}, params: {param1: 'myparam'}, async: true) do |request, response|
if response.status == 200
puts "#{response.status} #{response.body}"
else
puts "#{response.status} #{response.errors.join(", ")}"
end
end
By default Fleck::Client
will use the default exchage, which is a :direct
exchange named ""
. But, if you need a different type of exchage,
you could specify it by setting :exchange_type
amd :exchange_name
options when creating the client.
connection = Fleck.connection(host: '127.0.0.1', port: 5672, user: 'guest', pass: 'guest', vhost: '/') # get a connection
client = Fleck::Client.new(connection, 'my.queue', exchange_type: :fanout, exchange_name: 'my.fanout.exchange') # create a new client
# make a request
client.request(action: 'task', params: {x: 1, y: 2}, async: true, timeout: 5) do |request, response|
if response.status == 200
# we did it!
puts response.body
else
# something went wrong
puts "Something went wrong!"
end
end
Sometimes you might need to receive multiple responses to a single request, for example if you're using a :fanout
exchange, and
there're multiple consumer that will respond to your request. The common request <--> response
model won't match this situation,
because after the first response the request will be terminated, that will cause a warning message for each response received after
the first response. To solve this problem you could use the :multiple_responses
option on client creation (by default is set to false
),
so that the client will be able to manage multiple responses.
connection = Fleck.connection(host: '127.0.0.1', port: 5672, user: 'guest', pass: 'guest', vhost: '/') # get a connection
client = Fleck::Client.new(connection, 'my.queue', exchange_type: :fanout, exchange_name: 'my.fanout.exchange', multiple_responses: true) # create a new client
# make a request
client.request(action: 'status', timeout: 5) do |request, response|
# this block will be executed for each received response
if response.status == 200
# we did it!
puts response.body
else
# something went wrong
puts "Something went wrong!"
end
end
NOTE: when you enable the :multiple_responses
option, this will forse async: true
for each request. Furthermore, this will set a default
timeout to 60
seconds, in order to prevent requests that are never completed, which may result in a memory leak. But if you need a request that
is never completed, you could set timeout: nil
when making the request.
To use Fleck::Consumer
all you need is to inherit it by an another class:
class MyConsumer < Fleck::Consumer
configure queue: 'my.queue', concurrency: 2
actions :random
initialize do
# initialization stuff
@my_message = "Hi! :)"
end
def random
if rand > 0.1
response.status = 200 # this is not strictly necessary (200 is the default status)
response.body = {x: rand, y: rand, message: @my_message}
else
response.render_error(500, 'Internal Server Error (just a joke)')
end
end
end
This code will automatically automatically start N
instances of MyConsumer in background (you don't have to do anything), that will start consuming
messages from my.queue
and will respond with a 200 status when the randomly generated number is greater than 0.1
and with a 500 otherwise.
NOTE: the default status code of the response is 200, but if any uncaught exception is raised from within #on_message
method, the status
will automatically change to 500 and will add "Internal Server Error"
message to response errors array.
By default Fleck::Consumer
will use the default exchange to consume messages from a queue. But if you need a different type of exchange, you
can specify it in consumer configuration.
class MyConsumer < Fleck::Consumer
configure queue: '', concurrency: 1, exchange_type: :fanout, exchange_name: 'my.fanout.exchange'
actions :status
def status
response.body = {status: 'up & running'}
end
end
Bug reports and pull requests are welcome on GitHub at https://github.com/serioja90/fleck. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
The gem is available as open source under the terms of the MIT License.