-
Notifications
You must be signed in to change notification settings - Fork 1
/
gochannel.go
43 lines (36 loc) · 1.08 KB
/
gochannel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package mq
import (
"context"
"github.com/wfusion/gofusion/common/infra/watermill"
"github.com/wfusion/gofusion/common/infra/watermill/pubsub/gochannel"
"github.com/wfusion/gofusion/config"
)
func newGoChannel(ctx context.Context, appName, name string, conf *Conf, logger watermill.LoggerAdapter) (
pub Publisher, sub Subscriber) {
cfg := gochannel.Config{
OutputChannelBuffer: int64(conf.ConsumerConcurrency),
Persistent: conf.Persistent,
ConsumerGroup: conf.ConsumerGroup,
BlockPublishUntilSubscriberAck: false,
AppID: config.Use(appName).AppName(),
}
native := gochannel.NewGoChannel(cfg, logger)
if conf.Producer {
pub = &goChannel{
abstractMQ: newPub(ctx, native, appName, name, conf, logger),
ch: native,
}
}
if conf.Consumer {
sub = &goChannel{
abstractMQ: newSub(ctx, native, appName, name, conf, logger),
ch: native,
}
}
return
}
type goChannel struct {
*abstractMQ
ch *gochannel.GoChannel
}
func (g *goChannel) close() (err error) { return g.ch.Close() }