diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 5754709932..1b6a681b1b 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -184,11 +184,13 @@ func (c *MemcachedClientConfig) validate() error { return errMemcachedMaxAsyncConcurrencyNotPositive } - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid + if c.SetAsyncCircuitBreakerEnabled { + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } } return nil } diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 297ff7d281..3ebf8b515d 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -36,8 +36,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: nil, }, @@ -46,8 +44,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedConfigNoAddrs, }, @@ -56,8 +52,6 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 0, DNSProviderUpdateInterval: time.Second, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedMaxAsyncConcurrencyNotPositive, }, @@ -65,25 +59,25 @@ func TestMemcachedClientConfig_validate(t *testing.T) { config: MemcachedClientConfig{ Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, - SetAsyncCircuitBreakerConsecutiveFailures: 1, - SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedDNSUpdateIntervalNotPositive, }, "should fail on circuit_breaker_consecutive_failures = 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 0, }, expected: errCircuitBreakerConsecutiveFailuresNotPositive, }, "should fail on circuit_breaker_failure_percent <= 0": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 1, SetAsyncCircuitBreakerFailurePercent: 0, }, @@ -91,9 +85,10 @@ func TestMemcachedClientConfig_validate(t *testing.T) { }, "should fail on circuit_breaker_failure_percent >= 1": { config: MemcachedClientConfig{ - Addresses: []string{"127.0.0.1:11211"}, - MaxAsyncConcurrency: 1, - DNSProviderUpdateInterval: time.Second, + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerEnabled: true, SetAsyncCircuitBreakerConsecutiveFailures: 1, SetAsyncCircuitBreakerFailurePercent: 1.1, }, @@ -725,7 +720,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { config := defaultMemcachedClientConfig config.Addresses = []string{"127.0.0.1:11211"} config.SetAsyncCircuitBreakerEnabled = true - config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond + config.SetAsyncCircuitBreakerOpenDuration = 2 * time.Millisecond config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures @@ -746,7 +741,11 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) cbimpl := client.setAsyncCircuitBreaker.(gobreakerCircuitBreaker).CircuitBreaker if testdata.expectCircuitBreakerOpen { - testutil.Equals(t, gobreaker.StateOpen, cbimpl.State()) + // Trigger the state transaction. + time.Sleep(time.Millisecond) + testutil.Ok(t, client.SetAsync(strconv.Itoa(testdata.setErrors), []byte("value"), time.Second)) + testutil.Equals(t, gobreaker.StateOpen, cbimpl.State(), "state should be open") + time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) for i := testdata.setErrors; i < testdata.setErrors+10; i++ { testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 6a0a9aedc6..b6c63bc619 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -155,11 +155,13 @@ func (c *RedisClientConfig) validate() error { } } - if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { - return errCircuitBreakerConsecutiveFailuresNotPositive - } - if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { - return errCircuitBreakerFailurePercentInvalid + if c.SetAsyncCircuitBreakerEnabled { + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } } return nil } diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index 6e755235fd..6845b45663 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -197,10 +197,23 @@ func TestValidateRedisConfig(t *testing.T) { }, expect_err: true, }, + { + name: "SetAsyncCircuitBreakerDisabled", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = false + cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + return cfg + }, + expect_err: false, + }, { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 return cfg }, @@ -210,6 +223,8 @@ func TestValidateRedisConfig(t *testing.T) { name: "invalidCircuitBreakerFailurePercent", config: func() RedisClientConfig { cfg := DefaultRedisClientConfig + cfg.Addr = "127.0.0.1:6789" + cfg.SetAsyncCircuitBreakerEnabled = true cfg.SetAsyncCircuitBreakerFailurePercent = 0 return cfg },