Skip to content

Commit b6fdeca

Browse files
committed
Add PubSub.ChannelSize
1 parent 21913a8 commit b6fdeca

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

pubsub.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,14 +400,30 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
400400
//
401401
// If the Go channel is full for 30 seconds the message is dropped.
402402
func (c *PubSub) Channel() <-chan *Message {
403-
c.chOnce.Do(c.initChannel)
403+
return c.channel(100)
404+
}
405+
406+
// ChannelSize is like Channel, but creates a Go channel
407+
// with specified buffer size.
408+
func (c *PubSub) ChannelSize(size int) <-chan *Message {
409+
return c.channel(size)
410+
}
411+
412+
func (c *PubSub) channel(size int) <-chan *Message {
413+
c.chOnce.Do(func() {
414+
c.initChannel(size)
415+
})
416+
if cap(c.ch) != size {
417+
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
418+
panic(err)
419+
}
404420
return c.ch
405421
}
406422

407-
func (c *PubSub) initChannel() {
423+
func (c *PubSub) initChannel(size int) {
408424
const timeout = 30 * time.Second
409425

410-
c.ch = make(chan *Message, 100)
426+
c.ch = make(chan *Message, size)
411427
c.ping = make(chan struct{}, 1)
412428

413429
go func() {

0 commit comments

Comments
 (0)