Библиотека rabbitmq для работы с сервером RabbitMQ является вспомогательной и построена поверх
github.com/rabbitmq/amqp091-go
. В неё добавлено понятие инициализаторов
каналов соединения: на их основе осуществляется автоматическое восстановление подключения к серверу и восстановление
состояния.
Для установки соединения с сервером и запуска обработчиков используется метод Run
(синхронный) или
Init
(асинхронный), которые позволяют задать несколько инициализаторов Initializer
. Эти обработчики будут
вызываться при каждой установке соединения, чтобы восстановить топологию и заново проинициализировать работу
своих сервисов.
В библиотеки представлены два генератора таких инициализаторов: Consume
для обработки входящих сообщений и
Publish
для публикации. Для инициализации одновременной обработки входящих событий и публикации новых можно
воспользоваться вспомогательной функцией Work
.
const queueName = "test.queue" // название очереди с сообщениями
queue := rabbitmq.NewQueue(queueName) // создаём описание очереди
handler := func(msg amqp091.Delivery) { // обработчик входящих сообщений
fmt.Println("->", msg.MessageId)
}
// подключаемся к серверу и запускаем автоматическую обработку входящих сообщений
pubFunc, err := rabbitmq.Work(ctx, addr, queue, handler)
if err != nil {
panic(err)
}
// формируем сообщение для отправки
msg := amqp091.Publishing{
MessageId: "msg.test",
ContentType: "text/plain",
Body: []byte("data"),
}
// вызываем функцию публикации
err = pubFunc(ctx, "", queueName, msg)
if err != nil {
panic(err)
}