Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tutorials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-one-python)
* [Java](tutorials/tutorial-one-java)
* [Kotlin](tutorials/tutorial-one-kotlin)
* [Ruby](tutorials/tutorial-one-ruby)
* [PHP](tutorials/tutorial-one-php)
* [C#](tutorials/tutorial-one-dotnet)
Expand All @@ -92,6 +93,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-two-python)
* [Java](tutorials/tutorial-two-java)
* [Kotlin](tutorials/tutorial-two-kotlin)
* [Ruby](tutorials/tutorial-two-ruby)
* [PHP](tutorials/tutorial-two-php)
* [C#](tutorials/tutorial-two-dotnet)
Expand All @@ -112,6 +114,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-three-python)
* [Java](tutorials/tutorial-three-java)
* [Kotlin](tutorials/tutorial-three-kotlin)
* [Ruby](tutorials/tutorial-three-ruby)
* [PHP](tutorials/tutorial-three-php)
* [C#](tutorials/tutorial-three-dotnet)
Expand All @@ -134,6 +137,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-four-python)
* [Java](tutorials/tutorial-four-java)
* [Kotlin](tutorials/tutorial-four-kotlin)
* [Ruby](tutorials/tutorial-four-ruby)
* [PHP](tutorials/tutorial-four-php)
* [C#](tutorials/tutorial-four-dotnet)
Expand All @@ -154,6 +158,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-five-python)
* [Java](tutorials/tutorial-five-java)
* [Kotlin](tutorials/tutorial-five-kotlin)
* [Ruby](tutorials/tutorial-five-ruby)
* [PHP](tutorials/tutorial-five-php)
* [C#](tutorials/tutorial-five-dotnet)
Expand All @@ -174,6 +179,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.

* [Python](tutorials/tutorial-six-python)
* [Java](tutorials/tutorial-six-java)
* [Kotlin](tutorials/tutorial-six-kotlin)
* [Ruby](tutorials/tutorial-six-ruby)
* [PHP](tutorials/tutorial-six-php)
* [C#](tutorials/tutorial-six-dotnet)
Expand All @@ -191,6 +197,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
Reliable publishing with publisher confirms

* [Java](tutorials/tutorial-seven-java)
* [Kotlin](tutorials/tutorial-seven-kotlin)
* [C#](tutorials/tutorial-seven-dotnet)
* [PHP](tutorials/tutorial-seven-php)
</td>
Expand Down
292 changes: 292 additions & 0 deletions tutorials/tutorial-five-kotlin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
---
title: RabbitMQ tutorial - Topics
---
<!--
Copyright (c) 2005-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

All rights reserved. This program and the accompanying materials
are made available under the terms of the under the Apache License,
Version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md';
import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md';

# RabbitMQ tutorial - Topics

## Topics
### (using the Kotlin Client)

<TutorialsHelp/>

<T5DiagramToC/>

In the [previous tutorial](./tutorial-four-kotlin) we improved our
logging system. Instead of using a `fanout` exchange only capable of
dummy broadcasting, we used a `direct` one, and gained a possibility
of selectively receiving the logs.

Although using the `direct` exchange improved our system, it still has
limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs
based on severity, but also based on the source which emitted the log.
You might know this concept from the
[`syslog`](http://en.wikipedia.org/wiki/Syslog) unix tool, which
routes logs based on both severity (info/warn/crit...) and facility
(auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just
critical errors coming from 'cron' but also all logs from 'kern'.

To implement that in our logging system we need to learn about a more
complex `topic` exchange.


Topic exchange
--------------

Messages sent to a `topic` exchange can't have an arbitrary
`routing_key` - it must be a list of words, delimited by dots. The
words can be anything, but usually they specify some features
connected to the message. A few valid routing key examples:
"`stock.usd.nyse`", "`nyse.vmw`", "`quick.orange.rabbit`". There can be as
many words in the routing key as you like, up to the limit of 255
bytes.

The binding key must also be in the same form. The logic behind the
`topic` exchange is similar to a `direct` one - a message sent with a
particular routing key will be delivered to all the queues that are
bound with a matching binding key. However there are two important
special cases for binding keys:

* `*` (star) can substitute for exactly one word.
* `#` (hash) can substitute for zero or more words.

It's easiest to explain this in an example:

<T5DiagramTopicX/>

In this example, we're going to send messages which all describe
animals. The messages will be sent with a routing key that consists of
three words (two dots). The first word in the routing key will
describe speed, second a colour and third a species:
"`<speed>.<colour>.<species>`".

We created three bindings: Q1 is bound with binding key "`*.orange.*`"
and Q2 with "`*.*.rabbit`" and "`lazy.#`".

These bindings can be summarised as:

* Q1 is interested in all the orange animals.
* Q2 wants to hear everything about rabbits, and everything about lazy
animals.

A message with a routing key set to "`quick.orange.rabbit`"
will be delivered to both queues. Message "`lazy.orange.elephant`"
also will go to both of them. On the other hand
"`quick.orange.fox`" will only go to the first queue, and
"`lazy.brown.fox`" only to the second. "`lazy.pink.rabbit`" will
be delivered to the second queue only once, even though it matches two bindings.
"`quick.brown.fox`" doesn't match any binding so it will be discarded.

What happens if we break our contract and send a message with one or
four words, like "`orange`" or "`quick.orange.male.rabbit`"? Well,
these messages won't match any bindings and will be lost.

On the other hand "`lazy.orange.male.rabbit`", even though it has four
words, will match the last binding and will be delivered to the second
queue.

> #### Topic exchange
>
> Topic exchange is powerful and can behave like other exchanges.
>
> When a queue is bound with "`#`" (hash) binding key - it will receive
> all the messages, regardless of the routing key - like in `fanout` exchange.
>
> When special characters "`*`" (star) and "`#`" (hash) aren't used in bindings,
> the topic exchange will behave just like a `direct` one.

Putting it all together
-----------------------

We're going to use a `topic` exchange in our logging system. We'll
start off with a working assumption that the routing keys of logs will
have two words: "`<facility>.<severity>`".

The code is almost the same as in the
[previous tutorial](./tutorial-four-kotlin).

The code for `emitLogTopic`:

```kotlin
import dev.kourier.amqp.BuiltinExchangeType
import dev.kourier.amqp.Properties
import dev.kourier.amqp.connection.amqpConfig
import dev.kourier.amqp.connection.createAMQPConnection
import kotlinx.coroutines.CoroutineScope

suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) {
val config = amqpConfig {
server {
host = "localhost"
}
}
val connection = createAMQPConnection(coroutineScope, config)
val channel = connection.openChannel()

channel.exchangeDeclare(
"topic_logs",
BuiltinExchangeType.TOPIC,
durable = false,
autoDelete = false,
internal = false,
arguments = emptyMap()
)

channel.basicPublish(
message.toByteArray(),
exchange = "topic_logs",
routingKey = routingKey,
properties = Properties()
)
println(" [x] Sent '$routingKey':'$message'")

channel.close()
connection.close()
}
```

The code for `receiveLogsTopic`:

```kotlin
import dev.kourier.amqp.BuiltinExchangeType
import dev.kourier.amqp.connection.amqpConfig
import dev.kourier.amqp.connection.createAMQPConnection
import kotlinx.coroutines.CoroutineScope

suspend fun receiveLogsTopic(
coroutineScope: CoroutineScope,
bindingKeys: List<String>
) {
val config = amqpConfig {
server {
host = "localhost"
}
}
val connection = createAMQPConnection(coroutineScope, config)
val channel = connection.openChannel()

channel.exchangeDeclare(
"topic_logs",
BuiltinExchangeType.TOPIC,
durable = false,
autoDelete = false,
internal = false,
arguments = emptyMap()
)

val queueDeclared = channel.queueDeclare(
name = "",
durable = false,
exclusive = true,
autoDelete = true,
arguments = emptyMap()
)
val queueName = queueDeclared.queueName

if (bindingKeys.isEmpty()) {
System.err.println("Usage: receiveLogsTopic [binding_key]...")
return
}

for (bindingKey in bindingKeys) {
channel.queueBind(
queue = queueName,
exchange = "topic_logs",
routingKey = bindingKey
)
}

println(" [*] Waiting for logs. To exit press CTRL+C")

val consumer = channel.basicConsume(queueName, noAck = true)

for (delivery in consumer) {
val message = delivery.message.body.decodeToString()
val routingKey = delivery.message.routingKey
println(" [x] Received '$routingKey':'$message'")
}

channel.close()
connection.close()
}
```

To receive all the logs:

```kotlin
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
receiveLogsTopic(this, listOf("#"))
}
```

To receive all logs from the facility "`kern`":

```kotlin
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
receiveLogsTopic(this, listOf("kern.*"))
}
```

Or if you want to hear only about "`critical`" logs:

```kotlin
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
receiveLogsTopic(this, listOf("*.critical"))
}
```

You can create multiple bindings:

```kotlin
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
receiveLogsTopic(this, listOf("kern.*", "*.critical"))
}
```

And to emit a log with a routing key "`kern.critical`" type:

```kotlin
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
emitLogTopic(this, "kern.critical", "A critical kernel error")
}
```

Have fun playing with these programs. Note that the code doesn't make
any assumption about the routing or binding keys, you may want to play
with more than two routing key parameters.

Move on to [tutorial 6](./tutorial-six-kotlin) to find out how to do a
round trip message as a remote procedure call.
Loading