Connect UNIX pipes and message queues
Go
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
diagrams
.gitignore
.travis.yml
LICENSE
README.md
logo.png Switch offensive logo to neutral one Jan 4, 2016
pipecat.go

README.md

pipecat Build Status MIT licensed

pipecat

Pipecat allows you to scale any program supporting the FACK contract using traditional UNIX pipes and AMQP. Think of it as netcat but with message acknowledgments. It is the successor of redis-pipe.

# Publish sequence of numbers to a job queue.
seq 1 1000 | pipecat publish numbers

# Multiply each number with 10 and store results in a different queue.
pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

# Aggregate the results and calculate the sum
pipecat consume results --autoack --non-blocking \
  | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

If you are into streams and UNIX pipes checkout my Haskell based awk and sed alternative

Support

Pipecat supports a local mode and all AMQP 0.9.1 message brokers.

Install

You can download a single binary for Linux, OSX or Windows.

OSX

wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_darwin_amd64
chmod +x pipecat

./pipecat --help

Linux

wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_linux_amd64
chmod +x pipecat

./pipecat --help

Install from Source

go get github.com/lukasmartinelli/pipecat

If you are using Windows or 32-bit architectures you need to download the appropriate binary yourself.

Using pipecat

pipecat connects message queues and UNIX pipes. The need arose when I started building messaging support into utilities in order to make them scalable but still wanted to leave my programs the way they are without heavy dependencies and still be able to scale the process reliably.

In this example we will calculate the sum of a sequence of numbers.

Connect the broker

Specify the AMQP_URI env var to connect to the message broker.

export AMQP_URI=amqp://user:pass@host:5672/vhost

Create the queue

Let's create a new queue numbers and publish a sequence of numbers from 1 to 1000.

seq 1 1000 | pipecat publish numbers

Process input

Multiply the input sequence with factor 10 and publish the results to an additional results queue. This step can be run on multiple hosts. We want to acknowledge all received messages automatically with --autoack.

pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

Aggregate results

Now let's sum up all the numbers. Because we want to end after receiving all numbers we specify the --non-blocking mode which will close the connection if no messages have been received after a timeout.

pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

Local RabbitMQ with Docker

If you do not have an existing AMQP broker at hand you can run RabbitMQ in a docker container, expose the ports and connect to it.

docker run -d -p 5672:5672 --hostname pipecat-rabbit --name pipecat-rabbit rabbitmq:3

Now connect to localhost with the default guest login.

export AMQP_URI=amqp://guest:guest@localhost:5672/

Publish messages to Exchange

If you are using existing message queue infrastructure you can also publish messages to an exchange, with the first parameter used as the routing key. Thanks to @kennon for the implementation.

seq 1 1000 | pipecat publish --exchange "my_exchange" --no-create-queue my.routing.key

The AMQP_EXCHANGE environment variable can also be used:

export AMQP_EXCHANGE=my_exchange

Make it failsafe

We already have written a small, concise and very scalable set of programs. We can now run the multiply.py step on many servers.

However, if the server dies while multiply.py is running the input lines already processed are lost.

If your program needs that ability you need to implement the FACK contract, demonstrated for the multiply.py sample.

FACK Contract

Any program that accepts output from stdin and writes to stdout should accept an environment variable FACK containing a file descriptor. If a single operation performed on a line from stdin was successful , that line should be written to FACK.

FACK contract Flow

Implement the contract

Implementing the contract is straightforward.

  1. Support the optional FACK environment variable containing a file name
  2. Write the received input into this file handle if we performed the operation successfully on it

Python Example

Below is a Python example multiply.py which multiplies the sequence of numbers as above but writes the input line to stdack if successfully processed.

import sys
import os

with open(os.getenv('FACK', os.devnull), 'w') as stdack: # Works even if FACK is not set
    for line in sys.stdin:
        num = int(line.strip())
        result = num * 10
        sys.stdout.write('{}\n'.format(result))
        stdack.write(line) # Ack the processed line
        stdack.flush() # Make sure line does not get lost in the buffer

Use named queues for ACKs

Now your program can no longer lose messages with pipecat because you can feed the FACK output back into pipecat using named pipes which will only then acknowledge the messages from the message queue.

Pipecat Flow Diagram

Fill the queue again.

seq 1 1000 | pipecat publish numbers

And use a named pipe to funnel the acknowledged input lines back into pipecat.

mkfifo ack
cat ack | pipecat consume numbers \
| FACK=ack python -u multiply.py \
| pipecat publish results
rm ack

Consume all messages to reduce a result. In the reduce operation we need to autoack all received messages because we can't possibly hold the entire result set in memory until the operation has performed.

pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

With a few lines additional code only depending on the standard library you can now make any program in any language scalable using message queues. Without any dependencies and without changing the behavior bit.

Usage Examples

Create local Queue Backup

pipecat consume results --autoack --non-blocking > results_backup.json
cat results_backup.json | pipecat publish results

Cross Compile Release

We use gox to create distributable binaries for Windows, OSX and Linux.

docker run --rm -v "$(pwd)":/usr/src/pipecat -w /usr/src/pipecat tcnksm/gox:1.4.2-light