Skip to content

Commit

Permalink
feat: add health event manager and rules readiness probe (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurKnoep committed Apr 6, 2021
1 parent 0823fe7 commit 01d8588
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 266 deletions.
42 changes: 38 additions & 4 deletions api/health_test.go
Expand Up @@ -5,14 +5,23 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ory/oathkeeper/internal"
rulereadiness "github.com/ory/oathkeeper/rule/readiness"
"github.com/ory/oathkeeper/x"
)

type statusResult struct {
// Status should contains "ok" in case of success
Status string `json:"status"`
// Otherwise a map of error messages is returned
Errors map[string]string `json:"errors"`
}

func TestHealth(t *testing.T) {
conf := internal.NewConfigurationWithDefaults()
r := internal.NewRegistry(conf)
Expand All @@ -22,22 +31,47 @@ func TestHealth(t *testing.T) {
server := httptest.NewServer(router)
defer server.Close()

var result struct {
// Status always contains "ok".
Status string `json:"status"`
}
var result statusResult

// Checking health state before initializing the registry
res, err := server.Client().Get(server.URL + "/health/alive")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
require.NoError(t, json.NewDecoder(res.Body).Decode(&result))
assert.Equal(t, "ok", result.Status)
assert.Len(t, result.Errors, 0)

result = statusResult{}
res, err = server.Client().Get(server.URL + "/health/ready")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusServiceUnavailable, res.StatusCode)
require.NoError(t, json.NewDecoder(res.Body).Decode(&result))
assert.Empty(t, result.Status)
assert.Len(t, result.Errors, 1)
assert.Equal(t, rulereadiness.ErrRuleNotYetLoaded.Error(), result.Errors[rulereadiness.ProbeName])

r.Init()
// Waiting for rule load and health event propagation
time.Sleep(100 * time.Millisecond)

// Checking health state after initializing the registry
result = statusResult{}
res, err = server.Client().Get(server.URL + "/health/alive")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
require.NoError(t, json.NewDecoder(res.Body).Decode(&result))
assert.Equal(t, "ok", result.Status)
assert.Len(t, result.Errors, 0)

result = statusResult{}
res, err = server.Client().Get(server.URL + "/health/ready")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
require.NoError(t, json.NewDecoder(res.Body).Decode(&result))
assert.Equal(t, "ok", result.Status)
assert.Len(t, result.Errors, 0)
}
16 changes: 16 additions & 0 deletions driver/health/event_manager.go
@@ -0,0 +1,16 @@
package health

import (
"context"

"github.com/ory/x/healthx"
"github.com/pkg/errors"
)

type EventManager interface {
Dispatch(event ReadinessProbeEvent)
Watch(ctx context.Context)
HealthxReadyCheckers() healthx.ReadyCheckers
}

var ErrEventTypeAlreadyRegistered = errors.New("event type already registered")
66 changes: 66 additions & 0 deletions driver/health/event_manager_default.go
@@ -0,0 +1,66 @@
package health

import (
"context"

"github.com/ory/x/healthx"
"github.com/pkg/errors"
)

type DefaultHealthEventManager struct {
evtChan chan ReadinessProbeEvent
listeners []ReadinessProbe
listenerEventTypeCache map[string]ReadinessProbe
}

func NewDefaultHealthEventManager(listeners ...ReadinessProbe) (*DefaultHealthEventManager, error) {
var listenerEventTypeCache = make(map[string]ReadinessProbe)
for _, listener := range listeners {
for _, events := range listener.EventTypes() {
if _, ok := listenerEventTypeCache[events.ReadinessProbeListenerID()]; ok {
return nil, errors.WithStack(ErrEventTypeAlreadyRegistered)
}
listenerEventTypeCache[events.ReadinessProbeListenerID()] = listener
}
}
return &DefaultHealthEventManager{
evtChan: make(chan ReadinessProbeEvent),
listeners: listeners,
listenerEventTypeCache: listenerEventTypeCache,
}, nil
}

func (h *DefaultHealthEventManager) Dispatch(event ReadinessProbeEvent) {
if event == nil {
return
}
go func() {
h.evtChan <- event
}()
}

func (h *DefaultHealthEventManager) Watch(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case evt, ok := <-h.evtChan:
if !ok {
return
}
if listener, ok := h.listenerEventTypeCache[evt.ReadinessProbeListenerID()]; ok {
listener.EventsReceiver(evt)
}
}
}
}()
}

func (h *DefaultHealthEventManager) HealthxReadyCheckers() healthx.ReadyCheckers {
var checkers = make(healthx.ReadyCheckers)
for _, listener := range h.listeners {
checkers[listener.ID()] = listener.Validate
}
return checkers
}
104 changes: 104 additions & 0 deletions driver/health/event_manager_default_test.go
@@ -0,0 +1,104 @@
package health

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

const mockReadinessProbeName = "mock-readiness-probe"

type (
mockReadinessProbe struct {
hasReceivedEvent bool
testData string
}
mockReadinessEvent struct {
testData string
}
)

func (m *mockReadinessProbe) ID() string {
return mockReadinessProbeName
}

func (m *mockReadinessProbe) Validate() error {
return nil
}

func (m *mockReadinessProbe) EventTypes() []ReadinessProbeEvent {
return []ReadinessProbeEvent{&mockReadinessEvent{}}
}

func (m *mockReadinessProbe) EventsReceiver(evt ReadinessProbeEvent) {
switch castedEvent := evt.(type) {
case *mockReadinessEvent:
m.hasReceivedEvent = true
m.testData = castedEvent.testData
}
}

func (m *mockReadinessEvent) ReadinessProbeListenerID() string {
return mockReadinessProbeName
}

func TestNewDefaultHealthEventManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("health event manager", func(t *testing.T) {
readinessProbe := &mockReadinessProbe{}

// Create a new default health event manager with twice same probe
_, err := NewDefaultHealthEventManager(readinessProbe, readinessProbe)
require.Error(t, err)

// Create a new default health event manager
hem, err := NewDefaultHealthEventManager(readinessProbe)
require.NoError(t, err)

// Test healthx ready checkers generation
checkers := hem.HealthxReadyCheckers()
require.Len(t, checkers, 1)
_, ok := checkers[readinessProbe.ID()]
require.True(t, ok, "health checker was not found")

// Readiness probe must be empty before event dispatch
require.False(t, readinessProbe.hasReceivedEvent)
require.Equal(t, readinessProbe.testData, "")

// Nil events should be ignored
hem.Dispatch(nil)
require.False(t, readinessProbe.hasReceivedEvent)

// Dispatch event without watching (should not block)
const testData = "a sample string that will be passed along the event"
hem.Dispatch(&mockReadinessEvent{
testData: testData,
})

// Watching for incoming events
hem.Watch(ctx)

// Waiting for watcher to be ready, then verify the event has been received
time.Sleep(100 * time.Millisecond)
require.True(t, readinessProbe.hasReceivedEvent)
require.Equal(t, readinessProbe.testData, testData)

// Reset probe
readinessProbe.hasReceivedEvent = false
readinessProbe.testData = ""

// Dispatch a new event
hem.Dispatch(&mockReadinessEvent{
testData: testData,
})

// Wait for event propagation, then verify the event has been received
time.Sleep(100 * time.Millisecond)
require.True(t, readinessProbe.hasReceivedEvent)
require.Equal(t, readinessProbe.testData, testData)
cancel()
})
}
15 changes: 15 additions & 0 deletions driver/health/readiness.go
@@ -0,0 +1,15 @@
package health

type (
ReadinessProbe interface {
ID() string
Validate() error

EventTypes() []ReadinessProbeEvent
EventsReceiver(event ReadinessProbeEvent)
}

ReadinessProbeEvent interface {
ReadinessProbeListenerID() string
}
)
8 changes: 5 additions & 3 deletions driver/registry.go
Expand Up @@ -3,12 +3,13 @@ package driver
import (
"github.com/ory/x/logrusx"

"github.com/ory/oathkeeper/pipeline/errors"
"github.com/ory/oathkeeper/proxy"

"github.com/ory/x/healthx"
"github.com/ory/x/tracing"

"github.com/ory/oathkeeper/driver/health"
"github.com/ory/oathkeeper/pipeline/errors"
"github.com/ory/oathkeeper/proxy"

"github.com/ory/oathkeeper/api"
"github.com/ory/oathkeeper/credentials"
"github.com/ory/oathkeeper/driver/configuration"
Expand All @@ -30,6 +31,7 @@ type Registry interface {
BuildHash() string

ProxyRequestHandler() *proxy.RequestHandler
HealthEventManager() health.EventManager
HealthHandler() *healthx.Handler
RuleHandler() *api.RuleHandler
DecisionHandler() *api.DecisionHandler
Expand Down
23 changes: 21 additions & 2 deletions driver/registry_memory.go
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/ory/oathkeeper/driver/health"
"github.com/ory/oathkeeper/pipeline"
pe "github.com/ory/oathkeeper/pipeline/errors"
"github.com/ory/oathkeeper/proxy"
Expand All @@ -26,6 +27,7 @@ import (
ep "github.com/ory/oathkeeper/pipeline/errors"
"github.com/ory/oathkeeper/pipeline/mutate"
"github.com/ory/oathkeeper/rule"
rulereadiness "github.com/ory/oathkeeper/rule/readiness"
)

var _ Registry = new(RegistryMemory)
Expand Down Expand Up @@ -61,6 +63,8 @@ type RegistryMemory struct {
mutators map[string]mutate.Mutator
errors map[string]ep.Handler

healthEventManager *health.DefaultHealthEventManager

ruleRepositoryLock sync.Mutex
}

Expand All @@ -70,6 +74,7 @@ func (r *RegistryMemory) Init() {
r.Logger().WithError(err).Fatal("Access rule watcher terminated with an error.")
}
}()
r.HealthEventManager().Watch(context.Background())
_ = r.RuleRepository()
}

Expand Down Expand Up @@ -138,9 +143,23 @@ func (r *RegistryMemory) CredentialHandler() *api.CredentialsHandler {
return r.ch
}

func (r *RegistryMemory) HealthEventManager() health.EventManager {
if r.healthEventManager == nil {
var err error
rulesReadinessChecker := rulereadiness.NewReadinessHealthChecker()
if r.healthEventManager, err = health.NewDefaultHealthEventManager(rulesReadinessChecker); err != nil {
r.logger.WithError(err).Fatal("unable to instantiate new health event manager")
}
}
return r.healthEventManager
}

func (r *RegistryMemory) HealthHandler() *healthx.Handler {
r.RLock()
defer r.RUnlock()

if r.healthxHandler == nil {
r.healthxHandler = healthx.NewHandler(r.Writer(), r.BuildVersion(), healthx.ReadyCheckers{})
r.healthxHandler = healthx.NewHandler(r.Writer(), r.BuildVersion(), r.HealthEventManager().HealthxReadyCheckers())
}
return r.healthxHandler
}
Expand All @@ -154,7 +173,7 @@ func (r *RegistryMemory) RuleValidator() rule.Validator {

func (r *RegistryMemory) RuleRepository() rule.Repository {
if r.ruleRepository == nil {
r.ruleRepository = rule.NewRepositoryMemory(r)
r.ruleRepository = rule.NewRepositoryMemory(r, r.HealthEventManager())
}
return r.ruleRepository
}
Expand Down

0 comments on commit 01d8588

Please sign in to comment.