Skip to content

Commit

Permalink
refactor NewPool, NewSentinel, and NewCluster to use option pattern.
Browse files Browse the repository at this point in the history
This is a _slightly_ non-backwards compatible change, which I'm gonna
say is fine since this package is technically in (the world's longest)
beta. Sentinel and Cluster aren't significantly effected in terms of
their behavior, but Pool is.

Changes to Pool's default behavior:

* When empty the pool will wait a second before creating a new
connection.

* The pool slowly refills itself, in addition to the normal
creation-on-empty logic which was there.

In addition there is significantly more cabaility opened up in the
Pool's new options, notably the ability to have an overflow bufer. When
used via PoolOnFullBuffer the pool won't immediately close connections
which are put back into a full Pool. Instead it will buffer them and
slowly close them over time, making them available to be used in the
meantime.
  • Loading branch information
Brian Picciano committed Apr 15, 2018
1 parent ad2e28d commit 93d26b0
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 116 deletions.
2 changes: 1 addition & 1 deletion bench_test.go
Expand Up @@ -63,7 +63,7 @@ func BenchmarkParallelGetSet(b *B) {
wg.Wait()
}

radix, err := NewPool("tcp", "127.0.0.1:6379", parallel, nil)
radix, err := NewPool("tcp", "127.0.0.1:6379", parallel)
if err != nil {
b.Fatal(err)
}
Expand Down
53 changes: 40 additions & 13 deletions cluster.go
Expand Up @@ -35,11 +35,27 @@ func (d *dedupe) do(fn func()) {

////////////////////////////////////////////////////////////////////////////////

type clusterOpts struct {
pf ClientFunc
}

// ClusterOpt is an optional behavior which can be applied to the NewCluster
// function to effect a Cluster's behavior
type ClusterOpt func(*clusterOpts)

// ClusterPoolFunc tells the Cluster to use the given ClientFunc when creating
// pools of connections to cluster members.
func ClusterPoolFunc(pf ClientFunc) ClusterOpt {
return func(co *clusterOpts) {
co.pf = pf
}
}

// Cluster contains all information about a redis cluster needed to interact
// with it, including a set of pools to each of its instances. All methods on
// Cluster are thread-safe
type Cluster struct {
cf ClientFunc
co clusterOpts

// used to deduplicate calls to sync
syncDedupe *dedupe
Expand All @@ -59,27 +75,38 @@ type Cluster struct {
}

// NewCluster initializes and returns a Cluster instance. It will try every
// address given until it finds a usable one. From there it use CLUSTER SLOTS to
// discover the cluster topology and make all the necessary connections.
// address given until it finds a usable one. From there it uses CLUSTER SLOTS
// to discover the cluster topology and make all the necessary connections.
//
// The ClientFunc is used to make the internal clients for the instances
// discovered here and all new ones in the future. If nil is given then
// radix.DefaultClientFunc will be used.
func NewCluster(clusterAddrs []string, cf ClientFunc) (*Cluster, error) {
if cf == nil {
cf = DefaultClientFunc
}
// NewCluster takes in a number of options which can overwrite its default
// behavior. The default options NewCluster uses are:
//
// ClusterPoolFunc(DefaultClientFunc)
//
func NewCluster(clusterAddrs []string, opts ...ClusterOpt) (*Cluster, error) {
c := &Cluster{
cf: cf,
syncDedupe: newDedupe(),
pools: map[string]Client{},
closeCh: make(chan struct{}),
ErrCh: make(chan error, 1),
}

defaultClusterOpts := []ClusterOpt{
ClusterPoolFunc(DefaultClientFunc),
}

for _, opt := range append(defaultClusterOpts, opts...) {
// the other args to NewCluster used to be a ClientFunc, which someone
// might have left as nil, in which case this now gives a weird panic.
// Just handle it
if opt != nil {
opt(&(c.co))
}
}

// make a pool to base the cluster on
for _, addr := range clusterAddrs {
p, err := cf("tcp", addr)
p, err := c.co.pf("tcp", addr)
if err != nil {
continue
}
Expand Down Expand Up @@ -152,7 +179,7 @@ func (c *Cluster) pool(addr string) (Client, error) {

// it's important that the cluster pool set isn't locked while this is
// happening, because this could block for a while
if p, err = c.cf("tcp", addr); err != nil {
if p, err = c.co.pf("tcp", addr); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion cluster_stub_test.go
Expand Up @@ -231,7 +231,7 @@ func (scl *clusterStub) addrs() []string {
}

func (scl *clusterStub) newCluster() *Cluster {
c, err := NewCluster(scl.addrs(), scl.clientFunc())
c, err := NewCluster(scl.addrs(), ClusterPoolFunc(scl.clientFunc()))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 93d26b0

Please sign in to comment.