Toolkit for building processing pipelines using Unix Pipes and AMQP messages
Ruby Shell
Latest commit f28e6c9 Jun 27, 2012 Jonathan Hoskin Rebuilt for 1.4.4


Pipeline Toolkit

by vWorkApp

 |^^:^^^^:Mw^^^|    Pipeline Toolkit   
 |  :   .:Mw;  |    By vWorkApp        
 \-------------/    (c) 2009           
  | :    :Mw; |                        
  | :    :Mw; |                        
  | :    :Mw; |                        
  | :   .:Mw; |                        

Command line tools for processing messages by constructing a pipeline of machines. AMQP and Unix pipes are used to construct the pipeline. Messages are simple Hashes (serialized as JSON) so they can hold any values and change throughout the processing.


  • Processing acknowledgments, ensuring the a message is only disposed of once it has been successful processed
  • Performance. Messages are moved through the pipeline fast
  • Command line tools for:
    • Subscribing to messages from an AMQP queue
    • Pushing messages back onto an AMQP exchange
    • Monitoring performance (see msg_probe)
  • A base module ({MessageCommand}) to include into your own classes to quickly make machines.


 > (sudo) gem install pipeline_toolkit


It is assumed that you have:

  • An AMQP message server to pop and push messages to (e.g.

  • A *nix system, such as Linux or Mac OS X.

  • The 'uuidgen' command-line tool. This is installed by default on OS X, and is available through most package managers (e.g it is the "uuid-runtime" package in aptitude).

All gem dependancies are installed automatically, but if you're curious checkout pipeline_toolkit.gemspec


  1. Create your machine

    class MyMachine
      include MessageCommand
      def initialize_machine
        # Optional: setup any dependencies required by your machine
      def process_standard(message)
        # Required: handle the message within your machine
        unless failed_to_process?
          pass_on(message)  # or alternatively acknowledge if this
                            # is the last machine in the pipeline.
          # handle the failed message here (however you deem fit)
  2. Hook it up to a AMQP message source

    msg_subscribe -x raw -t topic -q octane | my_machine.rb | msg_push -x standard -q standard

You can learn more about the command line tools and what options are available by using their help command.

> msg_subscriber --help  
> msg_push --help  
> msg_sink --help  
> msg_probe --help  


In the example, open two terminal windows and then execute the following commands in each:

> msg_generator | msg_push -x test-exchange  
> msg_subscribe -a false --http-port 9090 -x test-exchange -q test_queue > /dev/null

The example does the following:

  1. Generate a bunch of sample messages
  2. Push them to an AMQP exchange
  3. Hook up a pipe to consume the generated messages
  4. Push results to /dev/null.
  5. Goto http://localhost:9090 to see message throughput.

Now lets switch on acknowledgements:

> msg_generator | msg_push -x test-exchange  
> msg_subscribe -x test-exchange -q test_queue | msg_sink

This example does the same as above, but this time with acknowledgements switched on. This guarantees that a message isn't removed from the message server until handled. msg_sink acknowledges all messages.

Pre-canned cucumber steps

Pipeline toolkit also includes pre-canned cucumber steps for you to test your machines with. The following steps are included:

    Given I run this machine with acknowledgements:
    Given I run this machine without acknowledgements:

    When I input these messages: (table)
    When I input these messages with ack_ids: (table)
    When I input these messages without ack_ids: (table)

    When I input this messages: (json string)
    When I input these messages with ack_ids: (json string)
    When I input these messages without ack_ids: (json string)

    Then these messages are passed on: (table)
    Then these messages are acknowledged: (table)

    Given an amqp server running on "localhost" at port 1234
    Given amqp queue "queue-name" is empty
    When I input these messages to exchange "queue-name"
    Then these messages are on queue "queue-name"

The steps are defined in the following files, so have a look at them to get more details


To make the steps available to your application's cucumber features simply put the follow at the top of your env.rb file (or any file under features/support).

require 'pipeline_toolkit/cucumber'


Pipeline toolkit uses the Log4r framework for logging. This lets you use output your logging to syslog, send email, tweet, dispatch a carrier pigeon, etc. See Log4r for more. To use it simply call any one of "debug", "info", "warn", "error", "fatal" on the DefaultLogger class. For examples:

DefaultLogginer.debug("Hi there!")

The logger is initialized automatically and can be configured using one of following three methods:

  1. Using the built-in defaults -- this is what you get if you use nothing else. The defaults are:

    • Output to stderr and the file 'pipeline_development.log' in the execution directory
    • Log level is DEBUG
  2. Load the logging configuration from a file called "log_config.yml" in your execution directory. See the Log4r rdoc for more details.

  3. Tweak the defaults (such as where the log_config.yml is loaded from) by calling DefaultLogger.init_logger before making any logging calls. The following options are available:

    # Used to build the file name. Made available in config YML as #{ENV}
    options[:env] ||= "development" 
    # Logger name. Used to build the file name. Made available in config YML as #{NAME}
    options[:log_name] ||= "pipeline" 
    # Log level. Made available in config YML as #{LEVEL}
    options[:log_level] ||= "DEBUG" 
    # Name of logger to use. If you specify multiple loggers in your YML, you can select which one to use here.
    options[:logger_name] ||= "log" 
    # File to read configuration from.
    options[:log_config_file] ||= "log_config.yml" 
  4. One difference with vanilla log4r is that if trace is enabled in the logger you define, the logging methods will automatically add the trace at the end of any string you pass for logging, regardless of what formatter you use. This is to work around the trace always appearing to come from DefaultLogger.method_missing.


  • Add tmp path as a argument (hardcoded to /tmp at mo)

  • Tidy up error handling. Should raise custom (and more descriptive) error messages. Particularly for broken pipes

  • Refactor /lib/amqp modules. Don't like it how they assume an 'options' variable exists

  • Write tests around queue setup for msg_push and msg_sub