Skip to content

Commit

Permalink
Update some mailgun deps to remove dependency on mgo (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
freeformz committed May 6, 2022
1 parent 07d2d56 commit b2b49e3
Show file tree
Hide file tree
Showing 56 changed files with 3,218 additions and 548 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
run:
deadline: 5m
skip-files: [ ]
skip-dirs: [ ]
skip-dirs: ["internal/holsterv4"]

linters-settings:
govet:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export GO111MODULE=on
default: clean checks test

test: clean
go test -race -cover ./...
go test -race -cover -count 1 ./...

test-verbose: clean
go test -v -race -cover ./...
Expand Down
46 changes: 17 additions & 29 deletions cbreaker/cbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"sync"
"time"

"github.com/mailgun/timetools"
log "github.com/sirupsen/logrus"
"github.com/vulcand/oxy/internal/holsterv4/clock"
"github.com/vulcand/oxy/memmetrics"
"github.com/vulcand/oxy/utils"
)
Expand All @@ -51,18 +51,16 @@ type CircuitBreaker struct {
onStandby SideEffect

state cbState
until time.Time
until clock.Time

rc *ratioController

checkPeriod time.Duration
lastCheck time.Time
lastCheck clock.Time

fallback http.Handler
next http.Handler

clock timetools.TimeProvider

log *log.Logger
}

Expand All @@ -72,7 +70,6 @@ func New(next http.Handler, expression string, options ...CircuitBreakerOption)
m: &sync.RWMutex{},
next: next,
// Default values. Might be overwritten by options below.
clock: &timetools.RealTime{},
checkPeriod: defaultCheckPeriod,
fallbackDuration: defaultFallbackDuration,
recoveryDuration: defaultRecoveryDuration,
Expand Down Expand Up @@ -151,16 +148,16 @@ func (c *CircuitBreaker) activateFallback(_ http.ResponseWriter, _ *http.Request
// someone else has set it to standby just now
return false
case stateTripped:
if c.clock.UtcNow().Before(c.until) {
if clock.Now().UTC().Before(c.until) {
return true
}
// We have been in active state enough, enter recovering state
c.setRecovering()
fallthrough
case stateRecovering:
// We have been in recovering state enough, enter standby and allow request
if c.clock.UtcNow().After(c.until) {
c.setState(stateStandby, c.clock.UtcNow())
if clock.Now().UTC().After(c.until) {
c.setState(stateStandby, clock.Now().UTC())
return false
}
// ratio controller allows this request
Expand All @@ -173,12 +170,12 @@ func (c *CircuitBreaker) activateFallback(_ http.ResponseWriter, _ *http.Request
}

func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
start := c.clock.UtcNow()
start := clock.Now().UTC()
p := utils.NewProxyWriterWithLogger(w, c.log)

c.next.ServeHTTP(p, req)

latency := c.clock.UtcNow().Sub(start)
latency := clock.Now().UTC().Sub(start)
c.metrics.Record(p.StatusCode(), latency)

// Note that this call is less expensive than it looks -- checkCondition only performs the real check
Expand Down Expand Up @@ -229,7 +226,7 @@ func (c *CircuitBreaker) setState(state cbState, until time.Time) {
func (c *CircuitBreaker) timeToCheck() bool {
c.m.RLock()
defer c.m.RUnlock()
return c.clock.UtcNow().After(c.lastCheck)
return clock.Now().UTC().After(c.lastCheck)
}

// Checks if tripping condition matches and sets circuit breaker to the tripped state.
Expand All @@ -242,10 +239,10 @@ func (c *CircuitBreaker) checkAndSet() {
defer c.m.Unlock()

// Other goroutine could have updated the lastCheck variable before we grabbed mutex
if !c.clock.UtcNow().After(c.lastCheck) {
if !clock.Now().UTC().After(c.lastCheck) {
return
}
c.lastCheck = c.clock.UtcNow().Add(c.checkPeriod)
c.lastCheck = clock.Now().UTC().Add(c.checkPeriod)

if c.state == stateTripped {
c.log.Debugf("%v skip set tripped", c)
Expand All @@ -256,28 +253,19 @@ func (c *CircuitBreaker) checkAndSet() {
return
}

c.setState(stateTripped, c.clock.UtcNow().Add(c.fallbackDuration))
c.setState(stateTripped, clock.Now().UTC().Add(c.fallbackDuration))
c.metrics.Reset()
}

func (c *CircuitBreaker) setRecovering() {
c.setState(stateRecovering, c.clock.UtcNow().Add(c.recoveryDuration))
c.rc = newRatioController(c.clock, c.recoveryDuration, c.log)
c.setState(stateRecovering, clock.Now().UTC().Add(c.recoveryDuration))
c.rc = newRatioController(c.recoveryDuration, c.log)
}

// CircuitBreakerOption represents an option you can pass to New.
// See the documentation for the individual options below.
type CircuitBreakerOption func(*CircuitBreaker) error

// Clock allows you to fake che CircuitBreaker's view of the current time.
// Intended for unit tests.
func Clock(clock timetools.TimeProvider) CircuitBreakerOption {
return func(c *CircuitBreaker) error {
c.clock = clock
return nil
}
}

// FallbackDuration is how long the CircuitBreaker will remain in the Tripped
// state before trying to recover.
func FallbackDuration(d time.Duration) CircuitBreakerOption {
Expand Down Expand Up @@ -357,9 +345,9 @@ const (
)

const (
defaultFallbackDuration = 10 * time.Second
defaultRecoveryDuration = 10 * time.Second
defaultCheckPeriod = 100 * time.Millisecond
defaultFallbackDuration = 10 * clock.Second
defaultRecoveryDuration = 10 * clock.Second
defaultCheckPeriod = 100 * clock.Millisecond
)

var defaultFallback = &fallback{}
Expand Down
42 changes: 23 additions & 19 deletions cbreaker/cbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vulcand/oxy/internal/holsterv4/clock"
"github.com/vulcand/oxy/memmetrics"
"github.com/vulcand/oxy/testutils"
)
Expand Down Expand Up @@ -39,9 +40,10 @@ func TestFullCycle(t *testing.T) {
_, _ = w.Write([]byte("hello"))
})

clock := testutils.GetClock()
done := testutils.FreezeTime()
defer done()

cb, err := New(handler, triggerNetRatio, Clock(clock))
cb, err := New(handler, triggerNetRatio)
require.NoError(t, err)

srv := httptest.NewServer(cb)
Expand All @@ -52,27 +54,27 @@ func TestFullCycle(t *testing.T) {
assert.Equal(t, http.StatusOK, re.StatusCode)

cb.metrics = statsNetErrors(0.6)
clock.CurrentTime = clock.CurrentTime.Add(defaultCheckPeriod + time.Millisecond)
clock.Advance(defaultCheckPeriod + clock.Millisecond)
_, _, err = testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, cbState(stateTripped), cb.state)

// Some time has passed, but we are still in trapped state.
clock.CurrentTime = clock.CurrentTime.Add(9 * time.Second)
clock.Advance(9 * clock.Second)
re, _, err = testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, re.StatusCode)
assert.Equal(t, cbState(stateTripped), cb.state)

// We should be in recovering state by now
clock.CurrentTime = clock.CurrentTime.Add(time.Second*1 + time.Millisecond)
clock.Advance(clock.Second*1 + clock.Millisecond)
re, _, err = testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, re.StatusCode)
assert.Equal(t, cbState(stateRecovering), cb.state)

// 5 seconds after we should be allowing some requests to pass
clock.CurrentTime = clock.CurrentTime.Add(5 * time.Second)
clock.Advance(5 * clock.Second)
allowed := 0
for i := 0; i < 100; i++ {
re, _, err = testutils.Get(srv.URL)
Expand All @@ -83,7 +85,7 @@ func TestFullCycle(t *testing.T) {
assert.NotEqual(t, 0, allowed)

// After some time, all is good and we should be in stand by mode again
clock.CurrentTime = clock.CurrentTime.Add(5*time.Second + time.Millisecond)
clock.Advance(5*clock.Second + clock.Millisecond)
re, _, err = testutils.Get(srv.URL)
assert.Equal(t, cbState(stateStandby), cb.state)
require.NoError(t, err)
Expand All @@ -101,7 +103,7 @@ func TestRedirectWithPath(t *testing.T) {
})
require.NoError(t, err)

cb, err := New(handler, triggerNetRatio, Clock(testutils.GetClock()), Fallback(fallbackRedirectPath))
cb, err := New(handler, triggerNetRatio, Fallback(fallbackRedirectPath))
require.NoError(t, err)

srv := httptest.NewServer(cb)
Expand Down Expand Up @@ -131,7 +133,7 @@ func TestRedirect(t *testing.T) {
fallbackRedirect, err := NewRedirectFallback(Redirect{URL: "http://localhost:5000"})
require.NoError(t, err)

cb, err := New(handler, triggerNetRatio, Clock(testutils.GetClock()), Fallback(fallbackRedirect))
cb, err := New(handler, triggerNetRatio, Fallback(fallbackRedirect))
require.NoError(t, err)

srv := httptest.NewServer(cb)
Expand All @@ -158,9 +160,10 @@ func TestTriggerDuringRecovery(t *testing.T) {
_, _ = w.Write([]byte("hello"))
})

clock := testutils.GetClock()
done := testutils.FreezeTime()
defer done()

cb, err := New(handler, triggerNetRatio, Clock(clock), CheckPeriod(time.Microsecond))
cb, err := New(handler, triggerNetRatio, CheckPeriod(clock.Microsecond))
require.NoError(t, err)

srv := httptest.NewServer(cb)
Expand All @@ -172,14 +175,14 @@ func TestTriggerDuringRecovery(t *testing.T) {
assert.Equal(t, cbState(stateTripped), cb.state)

// We should be in recovering state by now
clock.CurrentTime = clock.CurrentTime.Add(10*time.Second + time.Millisecond)
clock.Advance(10*clock.Second + clock.Millisecond)
re, _, err := testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, re.StatusCode)
assert.Equal(t, cbState(stateRecovering), cb.state)

// We have matched error condition during recovery state and are going back to tripped state
clock.CurrentTime = clock.CurrentTime.Add(5 * time.Second)
clock.Advance(5 * clock.Second)
cb.metrics = statsNetErrors(0.6)
allowed := 0
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -234,9 +237,10 @@ func TestSideEffects(t *testing.T) {
_, _ = w.Write([]byte("hello"))
})

clock := testutils.GetClock()
done := testutils.FreezeTime()
defer done()

cb, err := New(handler, triggerNetRatio, Clock(clock), CheckPeriod(time.Microsecond), OnTripped(onTripped), OnStandby(onStandby))
cb, err := New(handler, triggerNetRatio, CheckPeriod(clock.Microsecond), OnTripped(onTripped), OnStandby(onStandby))
require.NoError(t, err)

srv := httptest.NewServer(cb)
Expand All @@ -254,19 +258,19 @@ func TestSideEffects(t *testing.T) {
assert.Equal(t, "/post.json", req.URL.Path)
assert.Equal(t, `{"Key": ["val1", "val2"]}`, string(srv1Body))
assert.Equal(t, "application/json", req.Header.Get("Content-Type"))
case <-time.After(time.Second):
case <-clock.After(clock.Second):
t.Error("timeout waiting for side effect to kick off")
}

// Transition to recovering state
clock.CurrentTime = clock.CurrentTime.Add(10*time.Second + time.Millisecond)
clock.Advance(10*clock.Second + clock.Millisecond)
cb.metrics = statsOK()
_, _, err = testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, cbState(stateRecovering), cb.state)

// Going back to standby
clock.CurrentTime = clock.CurrentTime.Add(10*time.Second + time.Millisecond)
clock.Advance(10*clock.Second + clock.Millisecond)
_, _, err = testutils.Get(srv.URL)
require.NoError(t, err)
assert.Equal(t, cbState(stateStandby), cb.state)
Expand All @@ -276,7 +280,7 @@ func TestSideEffects(t *testing.T) {
assert.Equal(t, http.MethodPost, req.Method)
assert.Equal(t, "/post", req.URL.Path)
assert.Equal(t, url.Values{"key": []string{"val1", "val2"}}, req.Form)
case <-time.After(time.Second):
case <-clock.After(clock.Second):
t.Error("timeout waiting for side effect to kick off")
}
}
Expand Down
4 changes: 2 additions & 2 deletions cbreaker/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package cbreaker

import (
"fmt"
"time"

"github.com/vulcand/oxy/internal/holsterv4/clock"
"github.com/vulcand/predicate"
)

Expand Down Expand Up @@ -53,7 +53,7 @@ func latencyAtQuantile(quantile float64) toInt {
c.log.Errorf("Failed to get latency histogram, for %v error: %v", c, err)
return 0
}
return int(h.LatencyAtQuantile(quantile) / time.Millisecond)
return int(h.LatencyAtQuantile(quantile) / clock.Millisecond)
}
}

Expand Down
6 changes: 3 additions & 3 deletions cbreaker/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package cbreaker

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vulcand/oxy/internal/holsterv4/clock"
"github.com/vulcand/oxy/memmetrics"
)

Expand All @@ -27,12 +27,12 @@ func TestTripped(t *testing.T) {
},
{
expression: "LatencyAtQuantileMS(50.0) > 50",
metrics: statsLatencyAtQuantile(50, time.Millisecond*51),
metrics: statsLatencyAtQuantile(50, clock.Millisecond*51),
expected: true,
},
{
expression: "LatencyAtQuantileMS(50.0) < 50",
metrics: statsLatencyAtQuantile(50, time.Millisecond*51),
metrics: statsLatencyAtQuantile(50, clock.Millisecond*51),
expected: false,
},
{
Expand Down
Loading

0 comments on commit b2b49e3

Please sign in to comment.