Skip to content

Commit 61b1dcb

Browse files
authored
Return amqp channel with RabbitMQ client (#153)
* Return amqp channel with RabbitMQ client Signed-off-by: raihankhan <raihan@appscode.com>
1 parent 1138af2 commit 61b1dcb

File tree

137 files changed

+31909
-646
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+31909
-646
lines changed

go.mod

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@ require (
1919
github.com/gocql/gocql v1.6.0
2020
github.com/grafadruid/go-druid v0.0.6
2121
github.com/lib/pq v1.10.7
22-
github.com/michaelklishin/rabbit-hole/v2 v2.16.0
22+
github.com/michaelklishin/rabbit-hole/v3 v3.1.0
2323
github.com/microsoft/go-mssqldb v1.6.0
2424
github.com/opensearch-project/opensearch-go v1.1.0
2525
github.com/opensearch-project/opensearch-go/v2 v2.3.0
2626
github.com/pkg/errors v0.9.1
27-
github.com/rabbitmq/amqp091-go v1.9.0
27+
github.com/rabbitmq/amqp091-go v1.10.0
2828
github.com/redis/go-redis/v9 v9.5.1
2929
go.mongodb.org/mongo-driver v1.14.0
3030
k8s.io/api v0.30.3
3131
k8s.io/apimachinery v0.30.3
3232
k8s.io/klog/v2 v2.130.1
33-
kmodules.xyz/client-go v0.30.38
33+
kmodules.xyz/client-go v0.30.41
3434
kmodules.xyz/custom-resources v0.30.0
35-
kubedb.dev/apimachinery v0.49.0
35+
kubedb.dev/apimachinery v0.49.1-0.20241217045444-52bb83407e60
3636
sigs.k8s.io/controller-runtime v0.18.4
3737
xorm.io/xorm v1.3.6
3838
)
@@ -43,7 +43,7 @@ require (
4343
github.com/andybalholm/brotli v1.1.0 // indirect
4444
github.com/beorn7/perks v1.0.1 // indirect
4545
github.com/blang/semver/v4 v4.0.0 // indirect
46-
github.com/cert-manager/cert-manager v1.15.2 // indirect
46+
github.com/cert-manager/cert-manager v1.15.4 // indirect
4747
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4848
github.com/coreos/go-semver v0.3.1 // indirect
4949
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@@ -120,14 +120,14 @@ require (
120120
github.com/zeebo/xxh3 v1.0.2 // indirect
121121
go.opentelemetry.io/otel v1.28.0 // indirect
122122
go.opentelemetry.io/otel/trace v1.28.0 // indirect
123-
golang.org/x/crypto v0.27.0 // indirect
123+
golang.org/x/crypto v0.31.0 // indirect
124124
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
125-
golang.org/x/net v0.29.0 // indirect
125+
golang.org/x/net v0.30.0 // indirect
126126
golang.org/x/oauth2 v0.22.0 // indirect
127-
golang.org/x/sync v0.8.0 // indirect
128-
golang.org/x/sys v0.25.0 // indirect
129-
golang.org/x/term v0.24.0 // indirect
130-
golang.org/x/text v0.18.0 // indirect
127+
golang.org/x/sync v0.10.0 // indirect
128+
golang.org/x/sys v0.28.0 // indirect
129+
golang.org/x/term v0.27.0 // indirect
130+
golang.org/x/text v0.21.0 // indirect
131131
golang.org/x/time v0.5.0 // indirect
132132
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
133133
gomodules.xyz/mergo v0.3.13 // indirect
@@ -146,6 +146,7 @@ require (
146146
kmodules.xyz/monitoring-agent-api v0.30.2 // indirect
147147
kmodules.xyz/offshoot-api v0.30.1 // indirect
148148
kubeops.dev/petset v0.0.7 // indirect
149+
kubeops.dev/sidekick v0.0.10-0.20241122131943-163e27e5ef71 // indirect
149150
modernc.org/memory v1.5.0 // indirect
150151
modernc.org/token v1.1.0 // indirect
151152
sigs.k8s.io/gateway-api v1.1.0 // indirect

go.sum

Lines changed: 32 additions & 171 deletions
Large diffs are not rendered by default.

rabbitmq/client.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ limitations under the License.
1717
package rabbitmq
1818

1919
import (
20-
rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
20+
rmqhttp "github.com/michaelklishin/rabbit-hole/v3"
2121
amqp "github.com/rabbitmq/amqp091-go"
2222
)
2323

2424
type Client struct {
2525
AMQPClient
2626
HTTPClient
27+
Channel
2728
}
2829

2930
type AMQPClient struct {
@@ -37,3 +38,33 @@ type HTTPClient struct {
3738
type Channel struct {
3839
*amqp.Channel
3940
}
41+
42+
type ConnectionQueue struct {
43+
conn map[string]*Client
44+
}
45+
46+
func NewConnectionQueue() *ConnectionQueue {
47+
return &ConnectionQueue{
48+
conn: make(map[string]*Client),
49+
}
50+
}
51+
52+
func (c *ConnectionQueue) GetAMQPConnection(key string) *AMQPClient {
53+
return &c.conn[key].AMQPClient
54+
}
55+
56+
func (c *ConnectionQueue) GetHTTPConnection(key string) *HTTPClient {
57+
return &c.conn[key].HTTPClient
58+
}
59+
60+
func (c *ConnectionQueue) GetAMQPChannel(key string) *Channel {
61+
return &c.conn[key].Channel
62+
}
63+
64+
func (c *ConnectionQueue) GetClientWithKey(key string) *Client {
65+
return c.conn[key]
66+
}
67+
68+
func (c *ConnectionQueue) SetClientWithKey(key string, client *Client) {
69+
c.conn[key] = client
70+
}

rabbitmq/http_client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package rabbitmq
1919
import (
2020
"fmt"
2121

22-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
22+
rmqhttp "github.com/michaelklishin/rabbit-hole/v3"
2323
"k8s.io/klog/v2"
2424
)
2525

@@ -45,7 +45,7 @@ func (c *HTTPClient) IsAllNodesRunningInCluster(replicas int) (bool, error) {
4545
return true, nil
4646
}
4747

48-
func (c *HTTPClient) GetQueues() ([]rabbithole.QueueInfo, error) {
48+
func (c *HTTPClient) GetQueues() ([]rmqhttp.QueueInfo, error) {
4949
queues, err := c.Client.ListQueues()
5050
if err != nil {
5151
klog.Error(err, "Failed to get queue lists")
@@ -54,13 +54,13 @@ func (c *HTTPClient) GetQueues() ([]rabbithole.QueueInfo, error) {
5454
return queues, nil
5555
}
5656

57-
func (c *HTTPClient) GetClassicQueues() ([]rabbithole.QueueInfo, error) {
57+
func (c *HTTPClient) GetClassicQueues() ([]rmqhttp.QueueInfo, error) {
5858
queues, err := c.GetQueues()
5959
if err != nil {
6060
klog.Error(err, "Failed to get queue lists")
6161
return nil, err
6262
}
63-
classicQueues := []rabbithole.QueueInfo{}
63+
classicQueues := []rmqhttp.QueueInfo{}
6464

6565
for _, q := range queues {
6666
if q.Type == rabbitmqQueueTypeClassic {
@@ -71,7 +71,7 @@ func (c *HTTPClient) GetClassicQueues() ([]rabbithole.QueueInfo, error) {
7171
return classicQueues, nil
7272
}
7373

74-
func (c *HTTPClient) HasNodeAnyClassicQueue(queues []rabbithole.QueueInfo, node string) bool {
74+
func (c *HTTPClient) HasNodeAnyClassicQueue(queues []rmqhttp.QueueInfo, node string) bool {
7575
for _, q := range queues {
7676
if q.Type == rabbitmqQueueTypeClassic && q.Node == node {
7777
return true
@@ -80,13 +80,13 @@ func (c *HTTPClient) HasNodeAnyClassicQueue(queues []rabbithole.QueueInfo, node
8080
return false
8181
}
8282

83-
func (c *HTTPClient) GetQuorumQueues() ([]rabbithole.QueueInfo, error) {
83+
func (c *HTTPClient) GetQuorumQueues() ([]rmqhttp.QueueInfo, error) {
8484
queues, err := c.GetQueues()
8585
if err != nil {
8686
klog.Error(err, "Failed to get queue lists")
8787
return nil, err
8888
}
89-
quorumQueues := []rabbithole.QueueInfo{}
89+
quorumQueues := []rmqhttp.QueueInfo{}
9090

9191
for _, q := range queues {
9292
if q.Type == rabbitmqQueueTypeQuorum {
@@ -97,7 +97,7 @@ func (c *HTTPClient) GetQuorumQueues() ([]rabbithole.QueueInfo, error) {
9797
return quorumQueues, nil
9898
}
9999

100-
func (c *HTTPClient) IsNodePrimaryReplica(queues []rabbithole.QueueInfo, node string) bool {
100+
func (c *HTTPClient) IsNodePrimaryReplica(queues []rmqhttp.QueueInfo, node string) bool {
101101
for _, q := range queues {
102102
if q.Type == rabbitmqQueueTypeQuorum && q.Leader == node {
103103
return true

rabbitmq/kubedb_client_builder.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"strings"
2828
"time"
2929

30-
rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
30+
rmqhttp "github.com/michaelklishin/rabbit-hole/v3"
3131
amqp "github.com/rabbitmq/amqp091-go"
3232
core "k8s.io/api/core/v1"
3333
kerr "k8s.io/apimachinery/pkg/api/errors"
@@ -47,6 +47,7 @@ type KubeDBClientBuilder struct {
4747
httpURL string
4848
podName string
4949
vhost string
50+
connName string
5051
enableHTTPClient bool
5152
disableAMQPClient bool
5253
}
@@ -92,6 +93,11 @@ func (o *KubeDBClientBuilder) WithVHost(vhost string) *KubeDBClientBuilder {
9293
return o
9394
}
9495

96+
func (o *KubeDBClientBuilder) WithConnectionName(connName string) *KubeDBClientBuilder {
97+
o.connName = connName
98+
return o
99+
}
100+
95101
func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder {
96102
o.ctx = ctx
97103
return o
@@ -215,16 +221,31 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
215221
o.amqpURL = o.GetAMQPconnURL(username, password, o.vhost)
216222
}
217223

224+
extraConfigProperties := amqp.NewConnectionProperties()
225+
if o.connName != "" {
226+
extraConfigProperties.SetClientConnectionName(o.connName)
227+
}
228+
218229
rabbitConnection, err := amqp.DialConfig(o.amqpURL, amqp.Config{
219-
Vhost: o.vhost,
220-
Locale: "en_US",
230+
Vhost: o.vhost,
231+
Locale: "en_US",
232+
Properties: extraConfigProperties,
221233
})
222234
if err != nil {
223235
klog.Error(err, "Failed to connect to rabbitmq")
224236
return nil, err
225237
}
226238
klog.Info("Successfully created AMQP client for RabbitMQ")
239+
240+
msgChannel, err := rabbitConnection.Channel()
241+
if err != nil {
242+
klog.Error(err, "Failed to create AMQP channel")
243+
return nil, err
244+
}
245+
klog.Info("Successfully created AMQP channel for RabbitMQ")
246+
227247
rmqClient.AMQPClient = AMQPClient{rabbitConnection}
248+
rmqClient.Channel = Channel{msgChannel}
228249
}
229250

230251
return rmqClient, nil
File renamed without changes.
File renamed without changes.
File renamed without changes.

vendor/github.com/michaelklishin/rabbit-hole/v2/ChangeLog.md renamed to vendor/github.com/michaelklishin/rabbit-hole/v3/ChangeLog.md

Lines changed: 54 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/michaelklishin/rabbit-hole/v2/LICENSE renamed to vendor/github.com/michaelklishin/rabbit-hole/v3/LICENSE

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)