-
Notifications
You must be signed in to change notification settings - Fork 16
/
cmd_subscribe.go
35 lines (28 loc) · 1023 Bytes
/
cmd_subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Copyright (C) 2017 Jan Delgado
package main
// subscribe cli command handler
import (
"crypto/tls"
"os"
"github.com/jandelgado/rabtap/pkg"
)
// CmdSubscribeArg contains arguments for the subscribe command
type CmdSubscribeArg struct {
amqpURI string
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
signalChannel chan os.Signal
}
// cmdSub subscribes to messages from the given queue
func cmdSubscribe(cmd CmdSubscribeArg) {
log.Debugf("cmdSub: subscribing to queue %s", cmd.queue)
// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log)
defer subscriber.Close()
go subscriber.EstablishSubscription(cmd.queue, messageChannel)
messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
log.Debug("cmdSub: cmd_subscribe ending")
}