/
client_factory.go
36 lines (30 loc) · 1.14 KB
/
client_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package helper
import (
"context"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel/propagation"
"github.com/kujilabo/bamboo-root"
)
func CreateWorkerClient(ctx context.Context, workerName string, cfg *WorkerClientConfig, propagator propagation.TextMapPropagator) WorkerClient {
var rp bamboo.BambooRequestProducer
var rs bamboo.BambooResultSubscriber
if cfg.RequestProducer.Type == "kafka" {
rp = bamboo.NewKafkaBambooRequestProducer(ctx, workerName, &kafka.Writer{
Addr: kafka.TCP(cfg.RequestProducer.Kafka.Addr),
Topic: cfg.RequestProducer.Kafka.Topic,
Balancer: &kafka.LeastBytes{},
}, propagator)
} else if cfg.RequestProducer.Type == "redis" {
rp = bamboo.NewRedisBambooRequestProducer(ctx, workerName, redis.UniversalOptions{
Addrs: cfg.RequestProducer.Redis.Addrs,
}, cfg.RequestProducer.Redis.Channel, propagator)
}
if cfg.ResultSubscriber.Type == "redis" {
rs = bamboo.NewRedisResultSubscriber(ctx, workerName, &redis.UniversalOptions{
Addrs: cfg.ResultSubscriber.Redis.Addrs,
Password: cfg.ResultSubscriber.Redis.Password,
})
}
return NewWorkerClient(rp, rs)
}