Skip to content

viartemev/the-white-rabbit

master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Files

Permalink
Failed to load latest commit information.

The White Rabbit

Build Status Download Open Source Helpers codecov License: MIT Gitter

The White Rabbit is a fast and asynchronous RabbitMQ (AMQP) client library based on Kotlin coroutines. Currently the following features are supported:

  • Queue and exchange manipulations
  • Message publishing with confirmation
  • Message consuming with acknowledgment
  • Transactional publishing and consuming
  • RPC pattern

Adding to project

Gradle
repositories {
    jcenter()
}

compile 'com.viartemev:the-white-rabbit:$version'
Maven
<repositories>
    <repository>
        <id>jcenter</id>
        <url>https://jcenter.bintray.com/</url>
    </repository>
</repositories>

<dependency>
  <groupId>com.viartemev</groupId>
  <artifactId>the-white-rabbit</artifactId>
  <version>${version}</version>
</dependency>

Usage notes and examples

Use one of the extension methods on com.rabbitmq.client.Connection to get a channel you need:

connection.channel { 
    /*
    The plain channel with consumer acknowledgments, supports:
        -- queue and exchange manipulations
        -- asynchronous consuming
        -- RPC pattern
     */
}

connection.confirmChannel { // 
    /*
    Channel with publisher confirmations, additionally supports:
        -- asynchronous message publishing
     */
}

connection.txChannel { // transactional support
    /*
    Supports transactional publishing and consuming.
     */
}

Queue and exchange manipulations

Asynchronous exchange declaration

connection.channel.declareExchange(ExchangeSpecification(EXCHANGE_NAME))

Asynchronous queue declaration

connection.channel.declareQueue(QueueSpecification(QUEUE_NAME))

Asynchronous queue binding to an exchange

connection.channel.bindQueue(BindQueueSpecification(EXCHANGE_NAME, QUEUE_NAME))

Asynchronous message publishing with confirmation

connection.confirmChannel {
    publish {
        val messages = (1..n).map { createMessage("Hello #$it") }
        publishWithConfirmAsync(coroutineContext, messages).awaitAll()
    }
}

or

connection.confirmChannel {
     publish {
        coroutineScope {
            val messages = (1..n).map { createMessage("Hello #$it") }
            messages.map { async { publishWithConfirm(it) } }
        }
    }
}

Asynchronous message consuming with acknowledgement

Consume only n-messages:

connection.channel {
    consume(QUEUE_NAME, PREFETCH_COUNT) {
        (1..n).map { async { consumeMessageWithConfirm({ println(it) }) } }.awaitAll()
    }
}

Transactional publishing and consuming

RabbitMQ and AMQP itself offer rather scarce support for transaction. When considering using transactions you should be aware that:

  • a transaction could only span one channel and one queue;
  • com.rabbitmq.client.Channel is not thread-safe;
  • channel can be either in confirm mode or in transaction mode at a time;
  • transactions cannot be nested into each other;

The library provides a convenient way to perform transactional publishing and receiving based on transaction extension function. This function commits a transaction upon normal execution of the block and rolls it back if a RuntimeException occurs. Exceptions are always propagated further. Coroutines are not used for publishing though, since there are no any asynchronous operations involved.

connection.txChannel {
    transaction {
        val message = createMessage(queue = oneTimeQueue, body = "Hello from tx")
        publish(message)
    }
}

RPC pattern

connection.channel {
    val message = RabbitMqMessage(MessageProperties.PERSISTENT_BASIC, "Hello world".toByteArray())
    coroutineScope {
        (1..10).map {
            async {
                rpc {
                    call(requestQueueName = "rpc_request", message = message)
                        .also { println("Reply: ${String(it.body)}") }
                }
            }
        }.awaitAll()
    }
}