-
Notifications
You must be signed in to change notification settings - Fork 0
/
get.go
91 lines (85 loc) · 2.07 KB
/
get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main
import (
"fmt"
"github.com/XANi/mqpp/backend"
"github.com/XANi/mqpp/common"
"github.com/XANi/mqpp/util"
"github.com/urfave/cli"
"os"
"strings"
"sync"
)
type Connections struct {
conns map[string]common.Backend
sync.Mutex
}
var mqConnections Connections
func init() {
mqConnections.conns = make(map[string]common.Backend)
}
func Get(c *cli.Context) error {
allURLDefault := true
for k, v := range defaultUrls {
if c.GlobalString(k+"-url") != v {
allURLDefault = false
break
}
}
if allURLDefault && (c.GlobalString("mq-type") == "") {
log.Notice("All queue URLs are default, will try to connect to each one in turn")
}
if timeFormat := c.GlobalString("time-format"); len(timeFormat) > 0 {
switch strings.ToLower(timeFormat) {
case "iso":
util.Formatting.TimeFormat = "2006-01-02T15:04:05.000Z07:00"
case "ts":
util.Formatting.TimeFormat = "15:04:05.000"
default:
util.Formatting.TimeFormat = timeFormat
}
}
if c.GlobalString("mq-type") == "" {
var wg sync.WaitGroup
for _, mq := range supportedMQ {
// supress errors when connecting to queues using default URLs
supressError := false
url := defaultUrls[mq]
if c.GlobalString(mq+"-url") == defaultUrls[mq] {
supressError = true
} else {
url = c.GlobalString(mq + "-url")
}
wg.Add(1)
go func(mq string, url string) {
defer wg.Done()
conn, err := backend.Connect(mq, url, c.GlobalString("topic-filter"))
if err == nil {
log.Noticef("connected to %s:%s", mq, url)
mqConnections.Lock()
mqConnections.conns[mq] = conn
mqConnections.Unlock()
} else {
if !supressError {
log.Warningf("connection to %s failed: %s", url, err)
}
}
}(mq, url)
}
wg.Wait()
if len(mqConnections.conns) < 1 {
log.Errorf("Nothing connected, exiting")
os.Exit(1)
}
}
for k, v := range mqConnections.conns {
go func(k string, v common.Backend) {
ch := v.GetDefault()
for msg := range ch {
fmt.Println(util.Format(msg))
}
log.Warningf("connector %s closed the connection")
}(k, v)
}
_ = <-end
return nil
}