From ac7b76ae344a3d58111862209cf1fe160c1fdb1f Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 16:22:23 +1200 Subject: [PATCH 01/11] probes: Added listener for k8s health probes --- probes/probes.go | 92 ++++++++++++++++++++++++++++++++++++ probes/probes_medium_test.go | 74 +++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 probes/probes.go create mode 100644 probes/probes_medium_test.go diff --git a/probes/probes.go b/probes/probes.go new file mode 100644 index 0000000..715f804 --- /dev/null +++ b/probes/probes.go @@ -0,0 +1,92 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package probes defines a simpler HTTP listener for Kubernetes style +// liveness and readiness probes. +package probes + +import ( + "fmt" + "net/http" + "sync" + "sync/atomic" +) + +// Listen starts a simple HTTP listener for responding to Kubernetes +// liveness and readiness probes on the port specified. The returned +// Probes instance has methods for setting the liveness and readiness +// states. +// +// Liveness probes are served at /healthz. +// Readiness probes are served at /readyz. +func Listen(port int) *Probes { + p := &Probes{ + alive: new(atomic.Value), + ready: new(atomic.Value), + server: &http.Server{ + Addr: fmt.Sprintf(":%d", port), + }, + } + p.alive.Store(true) + p.ready.Store(false) + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", newHandler(p.alive)) + mux.HandleFunc("/readyz", newHandler(p.ready)) + p.server.Handler = mux + + p.wg.Add(1) + go func() { + defer p.wg.Done() + p.server.ListenAndServe() + }() + + return p +} + +// Probes contains a simple HTTP listener for serving Kubernetes +// liveness and readiness probes. +type Probes struct { + alive *atomic.Value + ready *atomic.Value + server *http.Server + wg sync.WaitGroup +} + +// SetAlive set the liveness state - true means alive/healthy. +func (p *Probes) SetAlive(alive bool) { + p.alive.Store(alive) +} + +// SetReady set the readiness state - true means ready. +func (p *Probes) SetReady(ready bool) { + p.ready.Store(ready) +} + +// Close shuts down the probes listener. It blocks until the listener +// has stopped. +func (p *Probes) Close() { + p.server.Close() + p.wg.Wait() +} + +func newHandler(value *atomic.Value) http.HandlerFunc { + return func(w http.ResponseWriter, _ *http.Request) { + if value.Load().(bool) { + w.WriteHeader(http.StatusOK) + return + } + http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) + } +} diff --git a/probes/probes_medium_test.go b/probes/probes_medium_test.go new file mode 100644 index 0000000..84da62b --- /dev/null +++ b/probes/probes_medium_test.go @@ -0,0 +1,74 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build medium + +package probes_test + +import ( + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jumptrading/influx-spout/probes" +) + +const probesPort = 44450 + +func TestProbes(t *testing.T) { + p := probes.Listen(probesPort) + defer p.Close() + + // Starting state is alive but not ready. + assertAlive(t) + assertNotReady(t) + + // Toggle alive. + p.SetAlive(false) + assertNotAlive(t) + p.SetAlive(true) + assertAlive(t) + + // Toggle ready. + p.SetReady(true) + assertReady(t) + p.SetReady(false) + assertNotReady(t) +} + +func assertAlive(t *testing.T) { + assertProbe(t, "healthz", http.StatusOK) +} + +func assertNotAlive(t *testing.T) { + assertProbe(t, "healthz", http.StatusServiceUnavailable) +} + +func assertReady(t *testing.T) { + assertProbe(t, "readyz", http.StatusOK) +} + +func assertNotReady(t *testing.T) { + assertProbe(t, "readyz", http.StatusServiceUnavailable) +} + +func assertProbe(t *testing.T, path string, expectedStatus int) { + url := fmt.Sprintf("http://localhost:%d/%s", probesPort, path) + resp, err := http.Get(url) + require.NoError(t, err) + assert.Equal(t, expectedStatus, resp.StatusCode) +} From 0461ccf8f3b1b66f871da57a51000f044c22cb01 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 17:03:46 +1200 Subject: [PATCH 02/11] config: Add probe_port This config option sets the port for the K8s probe listener. It defaults to 0 (off). --- README.md | 20 ++++++++++++++++++++ config/config.go | 2 ++ config/config_small_test.go | 5 +++++ 3 files changed, 27 insertions(+) diff --git a/README.md b/README.md index 2bc56f7..c65408f 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,10 @@ read_buffer_bytes = 4194304 # The listener will publish its own metrics to this NATS subject (for consumption # by the monitor). nats_subject_monitor = "influx-spout-monitor" + +# The listener will serve Kubernetes liveness and readiness probes on this port +# at /healthz and /readyz. Set to 0 (the default) to disable probes support. +probe_port = 0 ``` ### HTTP Listener @@ -179,6 +183,10 @@ listener_batch_bytes = 1048576 # The HTTP listener will publish its own metrics to this NATS subject (for # consumption by the monitor). nats_subject_monitor = "influx-spout-monitor" + +# The HTTP listener will serve Kubernetes liveness and readiness probes on this +# port at /healthz and /readyz. Set to 0 (the default) to disable probes support. +probe_port = 0 ``` ### Filter @@ -216,6 +224,10 @@ nats_pending_max_mb = 200 # The number of filter workers to spawn. workers = 8 +# The filter will serve Kubernetes liveness and readiness probes on this port at +# /healthz and /readyz. Set to 0 (the default) to disable probes support. +probe_port = 0 + # Incoming metrics with timestamps ± this value from the current time will be # rejected. Metrics with timestamps that are significantly different from previously # written timestamps negatively impact InfluxDB performance. @@ -324,6 +336,10 @@ nats_pending_max_mb = 200 # The writer will publish its own metrics to this NATS subject (for consumption # by the monitor). nats_subject_monitor = "influx-spout-monitor" + +# The writer will serve Kubernetes liveness and readiness probes on this port at +# /healthz and /readyz. Set to 0 (the default) to disable probes support. +probe_port = 0 ``` A writer will batch up messages until one of the limits defined by the @@ -356,6 +372,10 @@ nats_subject_monitor = "influx-spout-monitor" # The TCP port where the monitor will serve Prometheus formatted metrics over HTTP. port = 9331 + +# The monitor will serve Kubernetes liveness and readiness probes on this port +# at /healthz and /readyz. Set to 0 (the default) to disable probes support. +probe_port = 0 ``` ## Running tests diff --git a/config/config.go b/config/config.go index 78aae41..2ea3131 100644 --- a/config/config.go +++ b/config/config.go @@ -51,6 +51,7 @@ type Config struct { ListenerBatchBytes int `toml:"listener_batch_bytes"` Rule []Rule `toml:"rule"` MaxTimeDeltaSecs int `toml:"max_time_delta_secs"` + ProbePort int `toml:"probe_port"` Debug bool `toml:"debug"` } @@ -79,6 +80,7 @@ func newDefaultConfig() *Config { NATSPendingMaxMB: 200, ListenerBatchBytes: 1024 * 1024, MaxTimeDeltaSecs: 600, + ProbePort: 0, } } diff --git a/config/config_small_test.go b/config/config_small_test.go index 5007d7e..6301cf0 100644 --- a/config/config_small_test.go +++ b/config/config_small_test.go @@ -55,6 +55,8 @@ read_buffer_bytes = 43210 nats_pending_max_mb = 100 listener_batch_bytes = 4096 max_time_delta_secs = 789 + +probe_port = 6789 ` conf, err := parseConfig(validConfigSample) require.NoError(t, err, "Couldn't parse a valid config: %v\n", err) @@ -79,6 +81,8 @@ max_time_delta_secs = 789 assert.Equal(t, "spout", conf.NATSSubject[0], "Subject must match") assert.Equal(t, "spout-monitor", conf.NATSSubjectMonitor, "Monitor subject must match") assert.Equal(t, "nats://localhost:4222", conf.NATSAddress, "Address must match") + + assert.Equal(t, 6789, conf.ProbePort) } func TestAllDefaults(t *testing.T) { @@ -104,6 +108,7 @@ func TestAllDefaults(t *testing.T) { assert.Equal(t, 200, conf.NATSPendingMaxMB) assert.Equal(t, 1048576, conf.ListenerBatchBytes) assert.Equal(t, 600, conf.MaxTimeDeltaSecs) + assert.Equal(t, 0, conf.ProbePort) assert.Equal(t, false, conf.Debug) assert.Len(t, conf.Rule, 0) } From e2b485ece3a25a4f3e71edaa81e7caa3a03bd166 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 17:09:50 +1200 Subject: [PATCH 03/11] probes: Make Probes an interface In preparation for a "do nothing" listener. --- probes/probes.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/probes/probes.go b/probes/probes.go index 715f804..ec66a8d 100644 --- a/probes/probes.go +++ b/probes/probes.go @@ -23,6 +23,13 @@ import ( "sync/atomic" ) +// Probes defines the available operations for a probes listener. +type Probes interface { + SetAlive(bool) + SetReady(bool) + Close() +} + // Listen starts a simple HTTP listener for responding to Kubernetes // liveness and readiness probes on the port specified. The returned // Probes instance has methods for setting the liveness and readiness @@ -30,8 +37,8 @@ import ( // // Liveness probes are served at /healthz. // Readiness probes are served at /readyz. -func Listen(port int) *Probes { - p := &Probes{ +func Listen(port int) Probes { + p := &listener{ alive: new(atomic.Value), ready: new(atomic.Value), server: &http.Server{ @@ -55,9 +62,7 @@ func Listen(port int) *Probes { return p } -// Probes contains a simple HTTP listener for serving Kubernetes -// liveness and readiness probes. -type Probes struct { +type listener struct { alive *atomic.Value ready *atomic.Value server *http.Server @@ -65,18 +70,18 @@ type Probes struct { } // SetAlive set the liveness state - true means alive/healthy. -func (p *Probes) SetAlive(alive bool) { +func (p *listener) SetAlive(alive bool) { p.alive.Store(alive) } // SetReady set the readiness state - true means ready. -func (p *Probes) SetReady(ready bool) { +func (p *listener) SetReady(ready bool) { p.ready.Store(ready) } // Close shuts down the probes listener. It blocks until the listener // has stopped. -func (p *Probes) Close() { +func (p *listener) Close() { p.server.Close() p.wg.Wait() } From 8b32c5e5b847c7cc8e5fc8101252eac0f357f188 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 17:14:33 +1200 Subject: [PATCH 04/11] probes: Return nullListener when port is 0 --- probes/probes.go | 23 +++++++++++++++++++---- probes/probes_medium_test.go | 17 +++++++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/probes/probes.go b/probes/probes.go index ec66a8d..29ee25b 100644 --- a/probes/probes.go +++ b/probes/probes.go @@ -25,8 +25,14 @@ import ( // Probes defines the available operations for a probes listener. type Probes interface { + // SetAlive sets the liveness state - true means alive/healthy. SetAlive(bool) + + // SetReady sets the readiness state - true means ready. SetReady(bool) + + // Close shuts down the probes listener. It blocks until the + // listener has stopped. Close() } @@ -37,7 +43,14 @@ type Probes interface { // // Liveness probes are served at /healthz. // Readiness probes are served at /readyz. +// +// If port is 0 or less, no listener is started and a "do nothing" +// instance is returned. func Listen(port int) Probes { + if port <= 0 { + return new(nullListener) + } + p := &listener{ alive: new(atomic.Value), ready: new(atomic.Value), @@ -69,18 +82,14 @@ type listener struct { wg sync.WaitGroup } -// SetAlive set the liveness state - true means alive/healthy. func (p *listener) SetAlive(alive bool) { p.alive.Store(alive) } -// SetReady set the readiness state - true means ready. func (p *listener) SetReady(ready bool) { p.ready.Store(ready) } -// Close shuts down the probes listener. It blocks until the listener -// has stopped. func (p *listener) Close() { p.server.Close() p.wg.Wait() @@ -95,3 +104,9 @@ func newHandler(value *atomic.Value) http.HandlerFunc { http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) } } + +type nullListener struct{} + +func (p *nullListener) SetAlive(bool) {} +func (p *nullListener) SetReady(bool) {} +func (p *nullListener) Close() {} diff --git a/probes/probes_medium_test.go b/probes/probes_medium_test.go index 84da62b..f9c3a00 100644 --- a/probes/probes_medium_test.go +++ b/probes/probes_medium_test.go @@ -14,7 +14,7 @@ // +build medium -package probes_test +package probes import ( "fmt" @@ -23,14 +23,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/jumptrading/influx-spout/probes" ) const probesPort = 44450 func TestProbes(t *testing.T) { - p := probes.Listen(probesPort) + p := Listen(probesPort) defer p.Close() // Starting state is alive but not ready. @@ -50,6 +48,17 @@ func TestProbes(t *testing.T) { assertNotReady(t) } +func TestDisabled(t *testing.T) { + p := Listen(0) + defer p.Close() + + assert.IsType(t, new(nullListener), p) + + // Exercise methods (won't do anything). + p.SetAlive(false) + p.SetReady(false) +} + func assertAlive(t *testing.T) { assertProbe(t, "healthz", http.StatusOK) } From b473079b6775ac8d6a5009cb3b3c635de8eec639 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 10:42:08 +1200 Subject: [PATCH 05/11] listener: Expose k8s probe endpoints The listener tests now use readiness probes instead of the Ready channel (now removed). --- listener/listener.go | 35 ++++++++++++++++---------------- listener/listener_medium_test.go | 18 ++++++---------- spouttest/asserts.go | 22 ++++++++++++++++++++ 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/listener/listener.go b/listener/listener.go index 05c5044..b61bb6b 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -29,6 +29,7 @@ import ( "github.com/nats-io/go-nats" "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/probes" "github.com/jumptrading/influx-spout/stats" ) @@ -98,45 +99,43 @@ func StartHTTPListener(c *config.Config) (*Listener, error) { // UDP or HTTP, batches them and then publishes them to a NATS // subject. type Listener struct { - c *config.Config - nc *nats.Conn - stats *stats.Stats + c *config.Config + nc *nats.Conn + stats *stats.Stats + probes probes.Probes buf []byte batchSize int batchSizeThreshold int - wg sync.WaitGroup - ready chan struct{} // Is close once the listener is listening - stop chan struct{} -} - -// Ready returns a channel which is closed once the listener is -// actually listening for incoming data. -func (l *Listener) Ready() <-chan struct{} { - return l.ready + wg sync.WaitGroup + stop chan struct{} } // Stop shuts down a running listener. It should be called exactly // once for every Listener instance. func (l *Listener) Stop() { + l.probes.SetReady(false) + l.probes.SetAlive(false) + close(l.stop) l.wg.Wait() l.nc.Close() + l.probes.Close() } func newListener(c *config.Config) (*Listener, error) { l := &Listener{ - c: c, - ready: make(chan struct{}), - stop: make(chan struct{}), + c: c, + stop: make(chan struct{}), stats: stats.New( statReceived, statSent, statReadErrors, statFailedNATSPublish, ), - buf: make([]byte, c.ListenerBatchBytes), + probes: probes.Listen(c.ProbePort), + buf: make([]byte, c.ListenerBatchBytes), // If more than batchSizeThreshold bytes has been written to // the current batch buffer, the batch will be sent. We allow @@ -190,7 +189,7 @@ func (l *Listener) listenUDP(sc *net.UDPConn) { l.wg.Done() }() - close(l.ready) + l.probes.SetReady(true) for { sc.SetReadDeadline(time.Now().Add(time.Second)) sz, _, err := sc.ReadFromUDP(l.buf[l.batchSize:]) @@ -238,7 +237,7 @@ func (l *Listener) listenHTTP(server *http.Server) { defer l.wg.Done() go func() { - close(l.ready) + l.probes.SetReady(true) err := server.ListenAndServe() if err == nil || err == http.ErrServerClosed { return diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 3950410..60089bb 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -37,6 +37,7 @@ import ( const ( natsPort = 44444 listenPort = 44445 + probePort = 44446 natsSubject = "listener-test" natsMonitorSubject = natsSubject + "-monitor" ) @@ -81,6 +82,7 @@ func testConfig() *config.Config { ReadBufferBytes: 4 * 1024 * 1024, ListenerBatchBytes: 1024 * 1024, Port: listenPort, + ProbePort: probePort, } } @@ -181,9 +183,10 @@ loop: } func TestHTTPListener(t *testing.T) { - listener, err := StartHTTPListener(testConfig()) + conf := testConfig() + listener, err := StartHTTPListener(conf) require.NoError(t, err) - assertListenerStarted(t, listener) + spouttest.AssertReadyProbe(t, conf.ProbePort) defer listener.Stop() listenerCh, unsubListener := subListener(t) @@ -228,19 +231,10 @@ func BenchmarkListenerLatency(b *testing.B) { func startListener(t require.TestingT, conf *config.Config) *Listener { listener, err := StartListener(conf) require.NoError(t, err) - assertListenerStarted(t, listener) + spouttest.AssertReadyProbe(t, conf.ProbePort) return listener } -func assertListenerStarted(t require.TestingT, listener *Listener) { - select { - case <-listener.Ready(): - case <-time.After(spouttest.LongWait): - listener.Stop() - t.Errorf("listener failed to start up") - } -} - // dialListener creates a UDP connection to the listener's inbound port. func dialListener(t require.TestingT) *net.UDPConn { saddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", listenPort)) diff --git a/spouttest/asserts.go b/spouttest/asserts.go index d2ef4dc..24478a2 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -2,6 +2,7 @@ package spouttest import ( "fmt" + "net/http" "strconv" "strings" "testing" @@ -95,3 +96,24 @@ func stripTimestamp(t *testing.T, s string) string { // Strip off the timestamp return s[:i] } + +// AssertReadyProbe repeatedly tries a readiness probe on the given +// port. It will fail a test if no successful readiness probe is +// observed within LongWait. +func AssertReadyProbe(t require.TestingT, probePort int) { + require.True(t, probePort > 0, "probe port must be greater than 0") + maxTime := time.Now().Add(LongWait) + + url := fmt.Sprintf("http://localhost:%d/readyz", probePort) + client := &http.Client{Timeout: time.Second} + for time.Now().Before(maxTime) { + resp, err := client.Get(url) + if err == nil && resp.StatusCode == 200 { + return + } + time.Sleep(200 * time.Millisecond) + } + + t.Errorf("timed out waiting for successful ready probe") + t.FailNow() +} From a536c498a2226b51a3c2c12796a003ea94919f7c Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 10:49:31 +1200 Subject: [PATCH 06/11] filter: Expose k8s probe endpoints --- filter/filter.go | 26 ++++++++++++++++++-------- filter/filter_medium_test.go | 15 +++++++++++---- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/filter/filter.go b/filter/filter.go index 0a4adc4..7198b08 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -24,6 +24,7 @@ import ( "time" "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/probes" "github.com/jumptrading/influx-spout/stats" "github.com/nats-io/go-nats" ) @@ -43,9 +44,10 @@ const ( // NATS topic. func StartFilter(conf *config.Config) (_ *Filter, err error) { f := &Filter{ - c: conf, - stop: make(chan struct{}), - wg: new(sync.WaitGroup), + c: conf, + stop: make(chan struct{}), + wg: new(sync.WaitGroup), + probes: probes.Listen(conf.ProbePort), } defer func() { if err != nil { @@ -104,6 +106,8 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { log.Printf("filter subscribed to [%s] at %s with %d rules\n", f.c.NATSSubject[0], f.c.NATSAddress, rules.Count()) + + f.probes.SetReady(true) return f, nil } @@ -141,15 +145,19 @@ type natsConn interface { // Filter is a struct that contains the configuration we are running with // and the NATS bus connection type Filter struct { - c *config.Config - nc natsConn - sub *nats.Subscription - wg *sync.WaitGroup - stop chan struct{} + c *config.Config + nc natsConn + sub *nats.Subscription + wg *sync.WaitGroup + probes probes.Probes + stop chan struct{} } // Stop shuts down goroutines and closes resources related to the filter. func (f *Filter) Stop() { + f.probes.SetReady(false) + f.probes.SetAlive(false) + // Stop receiving lines to filter. f.sub.Unsubscribe() @@ -161,6 +169,8 @@ func (f *Filter) Stop() { if f.nc != nil { f.nc.Close() } + + f.probes.Close() } // startStatistician defines a goroutine that is responsible for diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index 2489cb5..5a0e35f 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -30,6 +30,7 @@ import ( ) const natsPort = 44446 +const probePort = 44447 func testConfig() *config.Config { return &config.Config{ @@ -46,6 +47,7 @@ func testConfig() *config.Config { Match: "hello", Subject: "hello-subject", }}, + ProbePort: probePort, } } @@ -55,8 +57,7 @@ func TestFilterWorker(t *testing.T) { conf := testConfig() - filter, err := StartFilter(conf) - require.NoError(t, err) + filter := startFilter(t, conf) defer filter.Stop() nc, err := nats.Connect(conf.NATSAddress) @@ -123,8 +124,7 @@ func TestInvalidTimeStamps(t *testing.T) { conf := testConfig() conf.MaxTimeDeltaSecs = 10 - filter, err := StartFilter(conf) - require.NoError(t, err) + filter := startFilter(t, conf) defer filter.Stop() nc, err := nats.Connect(conf.NATSAddress) @@ -174,3 +174,10 @@ func TestInvalidTimeStamps(t *testing.T) { `triggered{component="filter",name="particle",rule="hello-subject"} 2`, }) } + +func startFilter(t *testing.T, conf *config.Config) *Filter { + filter, err := StartFilter(conf) + require.NoError(t, err) + spouttest.AssertReadyProbe(t, conf.ProbePort) + return filter +} From de9b218d1f6127cfcd70bbbee1f8f64b2e08dd5d Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 11:35:25 +1200 Subject: [PATCH 07/11] Ensure component is stopped if readiness check fails Expose CheckReadyProbe as sometimes a failed assert isn't convenient. --- filter/filter_medium_test.go | 5 ++++- listener/listener_medium_test.go | 6 +++++- spouttest/asserts.go | 20 +++++++++++++++----- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index 5a0e35f..b4bc33c 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -178,6 +178,9 @@ func TestInvalidTimeStamps(t *testing.T) { func startFilter(t *testing.T, conf *config.Config) *Filter { filter, err := StartFilter(conf) require.NoError(t, err) - spouttest.AssertReadyProbe(t, conf.ProbePort) + if !spouttest.CheckReadyProbe(conf.ProbePort) { + filter.Stop() + t.Fatal("filter not ready") + } return filter } diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 60089bb..d1b8074 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -231,7 +231,11 @@ func BenchmarkListenerLatency(b *testing.B) { func startListener(t require.TestingT, conf *config.Config) *Listener { listener, err := StartListener(conf) require.NoError(t, err) - spouttest.AssertReadyProbe(t, conf.ProbePort) + if !spouttest.CheckReadyProbe(conf.ProbePort) { + listener.Stop() + t.Errorf("listener not ready") + t.FailNow() + } return listener } diff --git a/spouttest/asserts.go b/spouttest/asserts.go index 24478a2..6052a72 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -97,11 +97,13 @@ func stripTimestamp(t *testing.T, s string) string { return s[:i] } -// AssertReadyProbe repeatedly tries a readiness probe on the given -// port. It will fail a test if no successful readiness probe is +// CheckReadyProbe repeatedly tries a readiness probe on the given +// port. It will return true if a successful readiness probe is // observed within LongWait. -func AssertReadyProbe(t require.TestingT, probePort int) { - require.True(t, probePort > 0, "probe port must be greater than 0") +func CheckReadyProbe(probePort int) bool { + if probePort <= 0 { + panic("probe port must be greater than 0") + } maxTime := time.Now().Add(LongWait) url := fmt.Sprintf("http://localhost:%d/readyz", probePort) @@ -109,11 +111,19 @@ func AssertReadyProbe(t require.TestingT, probePort int) { for time.Now().Before(maxTime) { resp, err := client.Get(url) if err == nil && resp.StatusCode == 200 { - return + return true } time.Sleep(200 * time.Millisecond) } + return false +} + +// AssertReadyProbe fails a test if CheckReadyProbe returns false. +func AssertReadyProbe(t require.TestingT, probePort int) { + if CheckReadyProbe(probePort) { + return + } t.Errorf("timed out waiting for successful ready probe") t.FailNow() } From 3fa8eddc41b0ce45d521b9fb37132b47628784da Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 11:36:03 +1200 Subject: [PATCH 08/11] writer: Expose k8s probe endpoints --- writer/writer.go | 10 ++++++++++ writer/writer_medium_test.go | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/writer/writer.go b/writer/writer.go index 4a18539..e345211 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -33,6 +33,7 @@ import ( "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/filter" + "github.com/jumptrading/influx-spout/probes" "github.com/jumptrading/influx-spout/stats" ) @@ -54,6 +55,7 @@ type Writer struct { rules *filter.RuleSet stats *stats.Stats wg sync.WaitGroup + probes probes.Probes stop chan struct{} } @@ -66,6 +68,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { batchMaxBytes: c.BatchMaxMB * 1024 * 1024, batchMaxAge: time.Duration(c.BatchMaxSecs) * time.Second, stats: stats.New(statReceived, statWriteRequests, statFailedWrites, statMaxPending), + probes: probes.Listen(c.ProbePort), stop: make(chan struct{}), } defer func() { @@ -124,6 +127,8 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { log.Printf("POST timeout: %ds", c.WriteTimeoutSecs) log.Printf("maximum NATS subject size: %dMB", c.NATSPendingMaxMB) + w.probes.SetReady(true) + return w, nil } @@ -131,11 +136,16 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { // connection to NATS. It will be block until all Writer goroutines // have stopped. func (w *Writer) Stop() { + w.probes.SetReady(false) + w.probes.SetAlive(false) + close(w.stop) w.wg.Wait() if w.nc != nil { w.nc.Close() } + + w.probes.Close() } func (w *Writer) worker(jobs <-chan *nats.Msg) { diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index ee242a4..75149a5 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -33,8 +33,9 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -const influxPort = 44445 const natsPort = 44443 +const influxPort = 44445 +const probePort = 44446 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) @@ -54,6 +55,7 @@ func testConfig() *config.Config { Port: influxPort, Workers: 96, NATSPendingMaxMB: 32, + ProbePort: probePort, } } @@ -302,6 +304,11 @@ func runGnatsd(t FatalTestingT) (*nats.Conn, func()) { func startWriter(t require.TestingT, conf *config.Config) *Writer { w, err := StartWriter(conf) require.NoError(t, err) + if !spouttest.CheckReadyProbe(conf.ProbePort) { + w.Stop() + t.Errorf("writer not ready") + t.FailNow() + } return w } From 55bf27b685375a8156ed037adeb7b747fef37a2f Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 11:51:17 +1200 Subject: [PATCH 09/11] monitor: Expose k8s probe endpoints --- monitor/monitor.go | 31 ++++++++++++++++--------------- monitor/monitor_medium_test.go | 9 +++------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/monitor/monitor.go b/monitor/monitor.go index ad6e4ac..bb5bc78 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -21,9 +21,11 @@ import ( "net/http" "sync" + "github.com/nats-io/go-nats" + "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/probes" "github.com/jumptrading/influx-spout/prometheus" - "github.com/nats-io/go-nats" ) // Start initialises, starts and returns a new Monitor instance based @@ -31,9 +33,9 @@ import ( func Start(conf *config.Config) (_ *Monitor, err error) { m := &Monitor{ c: conf, - ready: make(chan struct{}), stop: make(chan struct{}), metrics: prometheus.NewMetricSet(), + probes: probes.Listen(conf.ProbePort), } defer func() { if err != nil { @@ -66,25 +68,22 @@ func Start(conf *config.Config) (_ *Monitor, err error) { // runtime statistics from the other influx-spout components and // makes them available via a HTTP endpoint in Prometheus format. type Monitor struct { - c *config.Config - nc *nats.Conn - sub *nats.Subscription - wg sync.WaitGroup - ready chan struct{} - stop chan struct{} + c *config.Config + nc *nats.Conn + sub *nats.Subscription + wg sync.WaitGroup + stop chan struct{} + probes probes.Probes mu sync.Mutex metrics *prometheus.MetricSet } -// Ready returns a channel which is closed once the monitor is -// actually listening for HTTP metrics requests. -func (m *Monitor) Ready() <-chan struct{} { - return m.ready -} - // Stop shuts down goroutines and closes resources related to the filter. func (m *Monitor) Stop() { + m.probes.SetReady(false) + m.probes.SetAlive(false) + // Stop receiving lines from NATS. m.sub.Unsubscribe() @@ -96,6 +95,8 @@ func (m *Monitor) Stop() { if m.nc != nil { m.nc.Close() } + + m.probes.Close() } func (m *Monitor) natsConnect() (*nats.Conn, error) { @@ -125,7 +126,7 @@ func (m *Monitor) serveHTTP() { } go func() { - close(m.ready) + m.probes.SetReady(true) err := server.ListenAndServe() if err == nil || err == http.ErrServerClosed { return diff --git a/monitor/monitor_medium_test.go b/monitor/monitor_medium_test.go index 1eeff64..615b633 100644 --- a/monitor/monitor_medium_test.go +++ b/monitor/monitor_medium_test.go @@ -34,6 +34,7 @@ import ( const natsPort = 44447 const httpPort = 44448 +const probePort = 44449 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) @@ -43,6 +44,7 @@ func testConfig() *config.Config { NATSAddress: natsAddress, NATSSubjectMonitor: "monitor-test-monitor", Port: httpPort, + ProbePort: probePort, } } @@ -55,12 +57,7 @@ func TestMonitor(t *testing.T) { mon, err := monitor.Start(conf) require.NoError(t, err) defer mon.Stop() - - select { - case <-mon.Ready(): - case <-time.After(spouttest.LongWait): - t.Fatal("timed out waiting for monitor to be ready") - } + spouttest.AssertReadyProbe(t, conf.ProbePort) publish := func(data []byte) { err := nc.Publish(conf.NATSSubjectMonitor, data) From 69b3777ac62eeea2b41d3632c192d22a7f260ab0 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 12:01:09 +1200 Subject: [PATCH 10/11] Test readiness probes in end-to-end test --- spouttest/e2e_test.go | 62 +++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 0e76610..8bb66a1 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -38,13 +38,24 @@ import ( ) const ( - natsPort = 44500 - influxdPort = 44501 - listenerPort = 44502 - httpListenerPort = 44503 - monitorPort = 44504 - influxDBName = "test" - sendCount = 10 + natsPort = 44500 + influxdPort = 44501 + + listenerPort = 44502 + listenerProbePort = 55502 + + httpListenerPort = 44503 + httpListenerProbePort = 55503 + + filterProbePort = 55504 + + writerProbePort = 55505 + + monitorPort = 44506 + monitorProbePort = 55506 + + influxDBName = "test" + sendCount = 10 ) func TestEndToEnd(t *testing.T) { @@ -63,23 +74,23 @@ func TestEndToEnd(t *testing.T) { // Start spout components. listener := startListener(t, fs) defer listener.Stop() + spouttest.AssertReadyProbe(t, listenerProbePort) httpListener := startHTTPListener(t, fs) defer httpListener.Stop() + spouttest.AssertReadyProbe(t, httpListenerProbePort) filter := startFilter(t, fs) defer filter.Stop() + spouttest.AssertReadyProbe(t, filterProbePort) writer := startWriter(t, fs) defer writer.Stop() + spouttest.AssertReadyProbe(t, writerProbePort) monitor := startMonitor(t, fs) defer monitor.Stop() - - // Make sure the listeners & monitor are actually listening. - assertReady(t, listener) - assertReady(t, httpListener) - assertReady(t, monitor) + spouttest.AssertReadyProbe(t, monitorProbePort) // Connect to the listener. addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort)) @@ -165,18 +176,6 @@ $`[1:]) t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines) } -type HasReady interface { - Ready() <-chan struct{} -} - -func assertReady(t *testing.T, component interface{}) { - select { - case <-component.(HasReady).Ready(): - case <-time.After(spouttest.LongWait): - t.Fatal("timeout out waiting for component to be ready") - } -} - const cpuLine = "cpu,env=prod,cls=server user=13.33,usage_system=0.16,usage_idle=86.53" func makeTestLines() *bytes.Buffer { @@ -200,7 +199,8 @@ nats_address = "nats://localhost:%d" batch = 5 debug = true nats_subject_monitor = "monitor" -`, listenerPort, natsPort)) +probe_port = %d +`, listenerPort, natsPort, listenerProbePort)) } func startHTTPListener(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -211,7 +211,8 @@ nats_address = "nats://localhost:%d" batch = 5 debug = true nats_subject_monitor = "monitor" -`, httpListenerPort, natsPort)) +probe_port = %d +`, httpListenerPort, natsPort, httpListenerProbePort)) } func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -220,12 +221,13 @@ mode = "filter" nats_address = "nats://localhost:%d" debug = true nats_subject_monitor = "monitor" +probe_port = %d [[rule]] type = "basic" match = "cpu" subject = "system" -`, natsPort)) +`, natsPort, filterProbePort)) } func startWriter(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -239,7 +241,8 @@ batch = 1 workers = 4 debug = true nats_subject_monitor = "monitor" -`, natsPort, influxdPort, influxDBName)) +probe_port = %d +`, natsPort, influxdPort, influxDBName, writerProbePort)) } func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -248,7 +251,8 @@ mode = "monitor" nats_address = "nats://localhost:%d" nats_subject_monitor = "monitor" port = %d -`, natsPort, monitorPort)) +probe_port = %d +`, natsPort, monitorPort, monitorProbePort)) } func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable { From 830386c8c6af21130125341e33130dfdbb593053 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 12:12:26 +1200 Subject: [PATCH 11/11] Be systematic about ports used in tests Avoid port collisions when tests are run in parallel. --- filter/filter_medium_test.go | 4 ++-- listener/listener_medium_test.go | 6 +++--- monitor/monitor_medium_test.go | 6 +++--- spouttest/e2e_test.go | 30 +++++++++++++++--------------- writer/writer_medium_test.go | 6 +++--- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index b4bc33c..e1d39e0 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -29,8 +29,8 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -const natsPort = 44446 -const probePort = 44447 +const natsPort = 44100 +const probePort = 44101 func testConfig() *config.Config { return &config.Config{ diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index d1b8074..2b8bdd7 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -35,9 +35,9 @@ import ( ) const ( - natsPort = 44444 - listenPort = 44445 - probePort = 44446 + natsPort = 44000 + listenPort = 44001 + probePort = 44002 natsSubject = "listener-test" natsMonitorSubject = natsSubject + "-monitor" ) diff --git a/monitor/monitor_medium_test.go b/monitor/monitor_medium_test.go index 615b633..d2d8b61 100644 --- a/monitor/monitor_medium_test.go +++ b/monitor/monitor_medium_test.go @@ -32,9 +32,9 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -const natsPort = 44447 -const httpPort = 44448 -const probePort = 44449 +const natsPort = 44300 +const httpPort = 44301 +const probePort = 44302 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 8bb66a1..d29d25d 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -38,21 +38,21 @@ import ( ) const ( - natsPort = 44500 - influxdPort = 44501 + natsPort = 44600 + influxdPort = 44601 - listenerPort = 44502 - listenerProbePort = 55502 + listenerPort = 44610 + listenerProbePort = 44611 - httpListenerPort = 44503 - httpListenerProbePort = 55503 + httpListenerPort = 44620 + httpListenerProbePort = 44621 - filterProbePort = 55504 + filterProbePort = 44631 - writerProbePort = 55505 + writerProbePort = 44641 - monitorPort = 44506 - monitorProbePort = 55506 + monitorPort = 44650 + monitorProbePort = 44651 influxDBName = "test" sendCount = 10 @@ -143,20 +143,20 @@ func TestEndToEnd(t *testing.T) { expectedMetrics := regexp.MustCompile(` failed_nats_publish{component="filter",name="filter"} 0 failed_nats_publish{component="listener",name="listener"} 0 -failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0 +failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44601",name="writer"} 0 invalid_time{component="filter",name="filter"} 0 -max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+ +max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44601",name="writer"} \d+ nats_dropped{component="filter",name="filter"} 0 -nats_dropped{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer",subject="system"} 0 +nats_dropped{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44601",name="writer",subject="system"} 0 passed{component="filter",name="filter"} 10 processed{component="filter",name="filter"} 20 read_errors{component="listener",name="listener"} 0 received{component="listener",name="listener"} 5 -received{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 2 +received{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44601",name="writer"} 2 rejected{component="filter",name="filter"} 10 sent{component="listener",name="listener"} 1 triggered{component="filter",name="filter",rule="system"} 10 -write_requests{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 2 +write_requests{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44601",name="writer"} 2 $`[1:]) var lines string for try := 0; try < 20; try++ { diff --git a/writer/writer_medium_test.go b/writer/writer_medium_test.go index 75149a5..6d4e491 100644 --- a/writer/writer_medium_test.go +++ b/writer/writer_medium_test.go @@ -33,9 +33,9 @@ import ( "github.com/jumptrading/influx-spout/spouttest" ) -const natsPort = 44443 -const influxPort = 44445 -const probePort = 44446 +const natsPort = 44200 +const influxPort = 44201 +const probePort = 44202 var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)