Skip to content

Commit

Permalink
make PersistantPubSub take in a ConnFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Picciano committed Jul 2, 2017
1 parent 2df9267 commit 1cb4ebc
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
2 changes: 0 additions & 2 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
weird
- If someone were to make a "RetryAction" kind of thing it would not play
nicely with cluster
- PubSub stuff is kind of a mess (in what way?)
- PersistentPubSub(...) should maybe take in a ConnFunc?
- do all the structs types in resp need to be structs?
- Read through docs, ensure everything is clean and sensical
- Read through code, make sure all code comments are correct
Expand Down
13 changes: 8 additions & 5 deletions pubsub_persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ type persistentPubSub struct {

// PersistentPubSub is like PubSub, but instead of taking in an existing Conn to
// wrap it will create one on the fly. If the connection is ever terminated then
// a new one will be created using the dialFn. None of the methods on the
// returned Conn will ever return an error, they will instead block until a
// connection can be successfully reinstated.
func PersistentPubSub(dialFn func() (Conn, error)) PubSubConn {
// a new one will be created using the connFn (which defaults to Dial if nil).
// None of the methods on the returned PubSubConn will ever return an error,
// they will instead block until a connection can be successfully reinstated.
func PersistentPubSub(network, addr string, connFn ConnFunc) PubSubConn {
if connFn == nil {
connFn = Dial
}
p := &persistentPubSub{
dial: dialFn,
dial: func() (Conn, error) { return connFn(network, addr) },
subs: chanSet{},
psubs: chanSet{},
closeCh: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion pubsub_persistent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestPersistentPubSub(t *T) {
closeCh := make(chan chan bool)
p := PersistentPubSub(func() (Conn, error) {
p := PersistentPubSub("", "", func(_, _ string) (Conn, error) {
c := dial()
go func() {
closeRetCh := <-closeCh
Expand Down
14 changes: 8 additions & 6 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ type Sentinel struct {
testEventCh chan string
}

// NewSentinel creates and returns a *Sentinel. dialFn may be nil, but if given
// NewSentinel creates and returns a *Sentinel. connFn may be nil, but if given
// can specify a custom ConnFunc to use when connecting to sentinels. clientFn
// may be nil, but if given can specify a custom ClientFunc to use when creating
// a client to the master instance.
func NewSentinel(masterName string, sentinelAddrs []string, dialFn ConnFunc, clientFn ClientFunc) (*Sentinel, error) {
if dialFn == nil {
dialFn = func(net, addr string) (Conn, error) {
func NewSentinel(masterName string, sentinelAddrs []string, connFn ConnFunc, clientFn ClientFunc) (*Sentinel, error) {
if connFn == nil {
connFn = func(net, addr string) (Conn, error) {
return DialTimeout(net, addr, 5*time.Second)
}
}
Expand All @@ -73,7 +73,7 @@ func NewSentinel(masterName string, sentinelAddrs []string, dialFn ConnFunc, cli
initAddrs: sentinelAddrs,
name: masterName,
addrs: addrs,
dfn: dialFn,
dfn: connFn,
cfn: clientFn,
pconnCh: make(chan PubSubMessage),
ErrCh: make(chan error, 1),
Expand All @@ -99,7 +99,9 @@ func NewSentinel(masterName string, sentinelAddrs []string, dialFn ConnFunc, cli
}

// because we're using persistent these can't _really_ fail
sc.pconn = PersistentPubSub(sc.dial)
sc.pconn = PersistentPubSub("", "", func(_, _ string) (Conn, error) {
return sc.dial()
})
sc.pconn.Subscribe(sc.pconnCh, "switch-master")

sc.closeWG.Add(2)
Expand Down

0 comments on commit 1cb4ebc

Please sign in to comment.