@@ -17,16 +17,18 @@ func (c *Client) Publish(channel, message string) *IntCmd {
1717// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
1818// multiple goroutines.
1919type PubSub struct {
20- * baseClient
20+ base * baseClient
2121
2222 channels []string
2323 patterns []string
24+
25+ nsub int // number of active subscriptions
2426}
2527
2628// Deprecated. Use Subscribe/PSubscribe instead.
2729func (c * Client ) PubSub () * PubSub {
2830 return & PubSub {
29- baseClient : & baseClient {
31+ base : & baseClient {
3032 opt : c .opt ,
3133 connPool : newStickyConnPool (c .connPool , false ),
3234 },
@@ -46,7 +48,7 @@ func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
4648}
4749
4850func (c * PubSub ) subscribe (cmd string , channels ... string ) error {
49- cn , _ , err := c .conn ()
51+ cn , _ , err := c .base . conn ()
5052 if err != nil {
5153 return err
5254 }
@@ -65,6 +67,7 @@ func (c *PubSub) Subscribe(channels ...string) error {
6567 err := c .subscribe ("SUBSCRIBE" , channels ... )
6668 if err == nil {
6769 c .channels = append (c .channels , channels ... )
70+ c .nsub += len (channels )
6871 }
6972 return err
7073}
@@ -74,6 +77,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
7477 err := c .subscribe ("PSUBSCRIBE" , patterns ... )
7578 if err == nil {
7679 c .patterns = append (c .patterns , patterns ... )
80+ c .nsub += len (patterns )
7781 }
7882 return err
7983}
@@ -113,8 +117,12 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error {
113117 return err
114118}
115119
120+ func (c * PubSub ) Close () error {
121+ return c .base .Close ()
122+ }
123+
116124func (c * PubSub ) Ping (payload string ) error {
117- cn , _ , err := c .conn ()
125+ cn , _ , err := c .base . conn ()
118126 if err != nil {
119127 return err
120128 }
@@ -178,7 +186,7 @@ func (p *Pong) String() string {
178186 return "Pong"
179187}
180188
181- func newMessage (reply []interface {}) (interface {}, error ) {
189+ func ( c * PubSub ) newMessage (reply []interface {}) (interface {}, error ) {
182190 switch kind := reply [0 ].(string ); kind {
183191 case "subscribe" , "unsubscribe" , "psubscribe" , "punsubscribe" :
184192 return & Subscription {
@@ -210,7 +218,11 @@ func newMessage(reply []interface{}) (interface{}, error) {
210218// is not received in time. This is low-level API and most clients
211219// should use ReceiveMessage.
212220func (c * PubSub ) ReceiveTimeout (timeout time.Duration ) (interface {}, error ) {
213- cn , _ , err := c .conn ()
221+ if c .nsub == 0 {
222+ c .resubscribe ()
223+ }
224+
225+ cn , _ , err := c .base .conn ()
214226 if err != nil {
215227 return nil , err
216228 }
@@ -222,7 +234,8 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
222234 if err != nil {
223235 return nil , err
224236 }
225- return newMessage (cmd .Val ())
237+
238+ return c .newMessage (cmd .Val ())
226239}
227240
228241// Receive returns a message as a Subscription, Message, PMessage,
@@ -232,22 +245,6 @@ func (c *PubSub) Receive() (interface{}, error) {
232245 return c .ReceiveTimeout (0 )
233246}
234247
235- func (c * PubSub ) reconnect (reason error ) {
236- // Close current connection.
237- c .connPool .(* stickyConnPool ).Reset (reason )
238-
239- if len (c .channels ) > 0 {
240- if err := c .Subscribe (c .channels ... ); err != nil {
241- Logger .Printf ("Subscribe failed: %s" , err )
242- }
243- }
244- if len (c .patterns ) > 0 {
245- if err := c .PSubscribe (c .patterns ... ); err != nil {
246- Logger .Printf ("PSubscribe failed: %s" , err )
247- }
248- }
249- }
250-
251248// ReceiveMessage returns a message or error. It automatically
252249// reconnects to Redis in case of network errors.
253250func (c * PubSub ) ReceiveMessage () (* Message , error ) {
@@ -259,27 +256,25 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
259256 return nil , err
260257 }
261258
262- goodConn := errNum == 0
263259 errNum ++
264-
265- if goodConn {
260+ if errNum < 3 {
266261 if netErr , ok := err .(net.Error ); ok && netErr .Timeout () {
267262 err := c .Ping ("" )
268263 if err == nil {
269264 continue
270265 }
271266 Logger .Printf ("PubSub.Ping failed: %s" , err )
272267 }
273- }
274-
275- if errNum > 2 {
268+ } else {
269+ // 3 consequent errors - connection is bad
270+ // and/or Redis Server is down.
271+ // Sleep to not exceed max number of open connections.
276272 time .Sleep (time .Second )
277273 }
278- c .reconnect (err )
279274 continue
280275 }
281276
282- // Reset error number.
277+ // Reset error number, because we received a message .
283278 errNum = 0
284279
285280 switch msg := msgi .(type ) {
@@ -300,3 +295,22 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
300295 }
301296 }
302297}
298+
299+ func (c * PubSub ) putConn (cn * conn , err error ) {
300+ if ! c .base .putConn (cn , err ) {
301+ c .nsub = 0
302+ }
303+ }
304+
305+ func (c * PubSub ) resubscribe () {
306+ if len (c .channels ) > 0 {
307+ if err := c .Subscribe (c .channels ... ); err != nil {
308+ Logger .Printf ("Subscribe failed: %s" , err )
309+ }
310+ }
311+ if len (c .patterns ) > 0 {
312+ if err := c .PSubscribe (c .patterns ... ); err != nil {
313+ Logger .Printf ("PSubscribe failed: %s" , err )
314+ }
315+ }
316+ }
0 commit comments