Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #75 from mjs/k8s-probes
Browse files Browse the repository at this point in the history
Expose k8s probe endpoints for all components
  • Loading branch information
oplehto committed May 8, 2018
2 parents 9e95499 + 830386c commit 5aee4d3
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 103 deletions.
20 changes: 20 additions & 0 deletions README.md
Expand Up @@ -135,6 +135,10 @@ read_buffer_bytes = 4194304
# The listener will publish its own metrics to this NATS subject (for consumption
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"

# The listener will serve Kubernetes liveness and readiness probes on this port
# at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0
```

### HTTP Listener
Expand Down Expand Up @@ -179,6 +183,10 @@ listener_batch_bytes = 1048576
# The HTTP listener will publish its own metrics to this NATS subject (for
# consumption by the monitor).
nats_subject_monitor = "influx-spout-monitor"

# The HTTP listener will serve Kubernetes liveness and readiness probes on this
# port at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0
```

### Filter
Expand Down Expand Up @@ -216,6 +224,10 @@ nats_pending_max_mb = 200
# The number of filter workers to spawn.
workers = 8

# The filter will serve Kubernetes liveness and readiness probes on this port at
# /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# Incoming metrics with timestamps ± this value from the current time will be
# rejected. Metrics with timestamps that are significantly different from previously
# written timestamps negatively impact InfluxDB performance.
Expand Down Expand Up @@ -324,6 +336,10 @@ nats_pending_max_mb = 200
# The writer will publish its own metrics to this NATS subject (for consumption
# by the monitor).
nats_subject_monitor = "influx-spout-monitor"

# The writer will serve Kubernetes liveness and readiness probes on this port at
# /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0
```

A writer will batch up messages until one of the limits defined by the
Expand Down Expand Up @@ -356,6 +372,10 @@ nats_subject_monitor = "influx-spout-monitor"

# The TCP port where the monitor will serve Prometheus formatted metrics over HTTP.
port = 9331

# The monitor will serve Kubernetes liveness and readiness probes on this port
# at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0
```

## Running tests
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -51,6 +51,7 @@ type Config struct {
ListenerBatchBytes int `toml:"listener_batch_bytes"`
Rule []Rule `toml:"rule"`
MaxTimeDeltaSecs int `toml:"max_time_delta_secs"`
ProbePort int `toml:"probe_port"`
Debug bool `toml:"debug"`
}

Expand Down Expand Up @@ -79,6 +80,7 @@ func newDefaultConfig() *Config {
NATSPendingMaxMB: 200,
ListenerBatchBytes: 1024 * 1024,
MaxTimeDeltaSecs: 600,
ProbePort: 0,
}
}

Expand Down
5 changes: 5 additions & 0 deletions config/config_small_test.go
Expand Up @@ -55,6 +55,8 @@ read_buffer_bytes = 43210
nats_pending_max_mb = 100
listener_batch_bytes = 4096
max_time_delta_secs = 789
probe_port = 6789
`
conf, err := parseConfig(validConfigSample)
require.NoError(t, err, "Couldn't parse a valid config: %v\n", err)
Expand All @@ -79,6 +81,8 @@ max_time_delta_secs = 789
assert.Equal(t, "spout", conf.NATSSubject[0], "Subject must match")
assert.Equal(t, "spout-monitor", conf.NATSSubjectMonitor, "Monitor subject must match")
assert.Equal(t, "nats://localhost:4222", conf.NATSAddress, "Address must match")

assert.Equal(t, 6789, conf.ProbePort)
}

func TestAllDefaults(t *testing.T) {
Expand All @@ -104,6 +108,7 @@ func TestAllDefaults(t *testing.T) {
assert.Equal(t, 200, conf.NATSPendingMaxMB)
assert.Equal(t, 1048576, conf.ListenerBatchBytes)
assert.Equal(t, 600, conf.MaxTimeDeltaSecs)
assert.Equal(t, 0, conf.ProbePort)
assert.Equal(t, false, conf.Debug)
assert.Len(t, conf.Rule, 0)
}
Expand Down
26 changes: 18 additions & 8 deletions filter/filter.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/stats"
"github.com/nats-io/go-nats"
)
Expand All @@ -43,9 +44,10 @@ const (
// NATS topic.
func StartFilter(conf *config.Config) (_ *Filter, err error) {
f := &Filter{
c: conf,
stop: make(chan struct{}),
wg: new(sync.WaitGroup),
c: conf,
stop: make(chan struct{}),
wg: new(sync.WaitGroup),
probes: probes.Listen(conf.ProbePort),
}
defer func() {
if err != nil {
Expand Down Expand Up @@ -104,6 +106,8 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {

log.Printf("filter subscribed to [%s] at %s with %d rules\n",
f.c.NATSSubject[0], f.c.NATSAddress, rules.Count())

f.probes.SetReady(true)
return f, nil
}

Expand Down Expand Up @@ -141,15 +145,19 @@ type natsConn interface {
// Filter is a struct that contains the configuration we are running with
// and the NATS bus connection
type Filter struct {
c *config.Config
nc natsConn
sub *nats.Subscription
wg *sync.WaitGroup
stop chan struct{}
c *config.Config
nc natsConn
sub *nats.Subscription
wg *sync.WaitGroup
probes probes.Probes
stop chan struct{}
}

// Stop shuts down goroutines and closes resources related to the filter.
func (f *Filter) Stop() {
f.probes.SetReady(false)
f.probes.SetAlive(false)

// Stop receiving lines to filter.
f.sub.Unsubscribe()

Expand All @@ -161,6 +169,8 @@ func (f *Filter) Stop() {
if f.nc != nil {
f.nc.Close()
}

f.probes.Close()
}

// startStatistician defines a goroutine that is responsible for
Expand Down
20 changes: 15 additions & 5 deletions filter/filter_medium_test.go
Expand Up @@ -29,7 +29,8 @@ import (
"github.com/jumptrading/influx-spout/spouttest"
)

const natsPort = 44446
const natsPort = 44100
const probePort = 44101

func testConfig() *config.Config {
return &config.Config{
Expand All @@ -46,6 +47,7 @@ func testConfig() *config.Config {
Match: "hello",
Subject: "hello-subject",
}},
ProbePort: probePort,
}
}

Expand All @@ -55,8 +57,7 @@ func TestFilterWorker(t *testing.T) {

conf := testConfig()

filter, err := StartFilter(conf)
require.NoError(t, err)
filter := startFilter(t, conf)
defer filter.Stop()

nc, err := nats.Connect(conf.NATSAddress)
Expand Down Expand Up @@ -123,8 +124,7 @@ func TestInvalidTimeStamps(t *testing.T) {
conf := testConfig()
conf.MaxTimeDeltaSecs = 10

filter, err := StartFilter(conf)
require.NoError(t, err)
filter := startFilter(t, conf)
defer filter.Stop()

nc, err := nats.Connect(conf.NATSAddress)
Expand Down Expand Up @@ -174,3 +174,13 @@ func TestInvalidTimeStamps(t *testing.T) {
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
}

func startFilter(t *testing.T, conf *config.Config) *Filter {
filter, err := StartFilter(conf)
require.NoError(t, err)
if !spouttest.CheckReadyProbe(conf.ProbePort) {
filter.Stop()
t.Fatal("filter not ready")
}
return filter
}
35 changes: 17 additions & 18 deletions listener/listener.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/nats-io/go-nats"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/stats"
)

Expand Down Expand Up @@ -98,45 +99,43 @@ func StartHTTPListener(c *config.Config) (*Listener, error) {
// UDP or HTTP, batches them and then publishes them to a NATS
// subject.
type Listener struct {
c *config.Config
nc *nats.Conn
stats *stats.Stats
c *config.Config
nc *nats.Conn
stats *stats.Stats
probes probes.Probes

buf []byte
batchSize int
batchSizeThreshold int

wg sync.WaitGroup
ready chan struct{} // Is close once the listener is listening
stop chan struct{}
}

// Ready returns a channel which is closed once the listener is
// actually listening for incoming data.
func (l *Listener) Ready() <-chan struct{} {
return l.ready
wg sync.WaitGroup
stop chan struct{}
}

// Stop shuts down a running listener. It should be called exactly
// once for every Listener instance.
func (l *Listener) Stop() {
l.probes.SetReady(false)
l.probes.SetAlive(false)

close(l.stop)
l.wg.Wait()
l.nc.Close()
l.probes.Close()
}

func newListener(c *config.Config) (*Listener, error) {
l := &Listener{
c: c,
ready: make(chan struct{}),
stop: make(chan struct{}),
c: c,
stop: make(chan struct{}),
stats: stats.New(
statReceived,
statSent,
statReadErrors,
statFailedNATSPublish,
),
buf: make([]byte, c.ListenerBatchBytes),
probes: probes.Listen(c.ProbePort),
buf: make([]byte, c.ListenerBatchBytes),

// If more than batchSizeThreshold bytes has been written to
// the current batch buffer, the batch will be sent. We allow
Expand Down Expand Up @@ -190,7 +189,7 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {
l.wg.Done()
}()

close(l.ready)
l.probes.SetReady(true)
for {
sc.SetReadDeadline(time.Now().Add(time.Second))
sz, _, err := sc.ReadFromUDP(l.buf[l.batchSize:])
Expand Down Expand Up @@ -238,7 +237,7 @@ func (l *Listener) listenHTTP(server *http.Server) {
defer l.wg.Done()

go func() {
close(l.ready)
l.probes.SetReady(true)
err := server.ListenAndServe()
if err == nil || err == http.ErrServerClosed {
return
Expand Down
24 changes: 11 additions & 13 deletions listener/listener_medium_test.go
Expand Up @@ -35,8 +35,9 @@ import (
)

const (
natsPort = 44444
listenPort = 44445
natsPort = 44000
listenPort = 44001
probePort = 44002
natsSubject = "listener-test"
natsMonitorSubject = natsSubject + "-monitor"
)
Expand Down Expand Up @@ -81,6 +82,7 @@ func testConfig() *config.Config {
ReadBufferBytes: 4 * 1024 * 1024,
ListenerBatchBytes: 1024 * 1024,
Port: listenPort,
ProbePort: probePort,
}
}

Expand Down Expand Up @@ -181,9 +183,10 @@ loop:
}

func TestHTTPListener(t *testing.T) {
listener, err := StartHTTPListener(testConfig())
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
assertListenerStarted(t, listener)
spouttest.AssertReadyProbe(t, conf.ProbePort)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
Expand Down Expand Up @@ -228,17 +231,12 @@ func BenchmarkListenerLatency(b *testing.B) {
func startListener(t require.TestingT, conf *config.Config) *Listener {
listener, err := StartListener(conf)
require.NoError(t, err)
assertListenerStarted(t, listener)
return listener
}

func assertListenerStarted(t require.TestingT, listener *Listener) {
select {
case <-listener.Ready():
case <-time.After(spouttest.LongWait):
if !spouttest.CheckReadyProbe(conf.ProbePort) {
listener.Stop()
t.Errorf("listener failed to start up")
t.Errorf("listener not ready")
t.FailNow()
}
return listener
}

// dialListener creates a UDP connection to the listener's inbound port.
Expand Down

0 comments on commit 5aee4d3

Please sign in to comment.